From 622fbd01797352c82072e5103bf81aa78a71c664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?France=CC=81=20Wilke?= <francewilke@gmail.com> Date: Tue, 11 Jan 2022 10:26:54 +0200 Subject: [PATCH] Pass (optional) MessageGroupId to `SendSQSMessage` for FIFO queues --- sqs/sqs.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/sqs/sqs.go b/sqs/sqs.go index 0e19dc7..2cb971e 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -49,7 +49,7 @@ func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) { // SendSQSMessage sends a message to the queue associated with the messenger // headers - string message attributes of the SQS message (see AWS SQS documentation) // body - body of the SQS message (see AWS SQS documentation) -func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string) (string, error) { +func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string, messageGroupID string) (string, error) { msgAttrs := make(map[string]*sqs.MessageAttributeValue) for key, val := range headers { @@ -72,11 +72,22 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre StringValue: aws.String(sqsType), } - res, err := m.service.SendMessage(&sqs.SendMessageInput{ - MessageAttributes: msgAttrs, - MessageBody: aws.String(body), - QueueUrl: &m.queueURL, - }) + var res *sqs.SendMessageOutput + var err error + if messageGroupID == "" { + res, err = m.service.SendMessage(&sqs.SendMessageInput{ + MessageAttributes: msgAttrs, + MessageBody: aws.String(body), + QueueUrl: &m.queueURL, + }) + } else { + res, err = m.service.SendMessage(&sqs.SendMessageInput{ + MessageAttributes: msgAttrs, + MessageBody: aws.String(body), + QueueUrl: &m.queueURL, + MessageGroupId: &messageGroupID, + }) + } if err != nil { return "", err @@ -85,7 +96,12 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre return *res.MessageId, err } -func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string) error { +func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error { + msgGrpID := "" + if len(messageGroupID) > 0 && messageGroupID[0] != "" { + msgGrpID = messageGroupID[0] + } + if msgr == nil { var err error msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName)) @@ -101,7 +117,7 @@ func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, obje } headers := map[string]string{"Name": "dummy"} - msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey) + msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey, msgGrpID) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err -- GitLab