Skip to content
Snippets Groups Projects
Commit 622fbd01 authored by Francé Wilke's avatar Francé Wilke
Browse files

Pass (optional) MessageGroupId to `SendSQSMessage` for FIFO queues

parent ce45a97d
No related branches found
No related tags found
No related merge requests found
......@@ -49,7 +49,7 @@ func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) {
// SendSQSMessage sends a message to the queue associated with the messenger
// headers - string message attributes of the SQS message (see AWS SQS documentation)
// body - body of the SQS message (see AWS SQS documentation)
func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string) (string, error) {
func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string, messageGroupID string) (string, error) {
msgAttrs := make(map[string]*sqs.MessageAttributeValue)
for key, val := range headers {
......@@ -72,11 +72,22 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
StringValue: aws.String(sqsType),
}
res, err := m.service.SendMessage(&sqs.SendMessageInput{
var res *sqs.SendMessageOutput
var err error
if messageGroupID == "" {
res, err = m.service.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.queueURL,
})
} else {
res, err = m.service.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.queueURL,
MessageGroupId: &messageGroupID,
})
}
if err != nil {
return "", err
......@@ -85,7 +96,12 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
return *res.MessageId, err
}
func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string) error {
func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error {
msgGrpID := ""
if len(messageGroupID) > 0 && messageGroupID[0] != "" {
msgGrpID = messageGroupID[0]
}
if msgr == nil {
var err error
msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName))
......@@ -101,7 +117,7 @@ func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, obje
}
headers := map[string]string{"Name": "dummy"}
msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey)
msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey, msgGrpID)
if err != nil {
logs.ErrorWithMsg("Failed to send sqs event", err)
return err
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment