diff --git a/sqs/sqs.go b/sqs/sqs.go index 745d888b9e50d97458ec8c80dc5dbfe73c26b9fb..104e05dd3befcab2d474f68660d6c066ab0cf014 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/google/uuid" "gitlab.com/uafrica/go-utils/s3" + "gitlab.com/uafrica/go-utils/string_utils" "io/ioutil" "time" @@ -23,11 +24,13 @@ const SQSMessageOnS3Key = "message-on-s3" // Messenger sends an arbitrary message via SQS type Messenger struct { - QueueName string - QueueURL string - Region string - S3Session *s3.SessionWithHelpers - S3BucketName string + QueueName string + QueueURL string + Region string + S3Session *s3.SessionWithHelpers + S3BucketName string + MessageGroupID *string + RequestIDHeaderKey string } // NewSQSClient constructs a Messenger which sends messages to an SQS queue @@ -54,12 +57,7 @@ func NewSQSClient(awsRegion string) (*sqs.SQS, error) { // SendSQSMessage sends a message to the queue associated with the messenger // headers - string message attributes of the SQS message (see AWS SQS documentation) // body - body of the SQS message (see AWS SQS documentation) -func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) (string, error) { - msgGrpID := "" - if len(messageGroupID) > 0 && messageGroupID[0] != "" { - msgGrpID = messageGroupID[0] - } - +func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) { msgAttrs := make(map[string]*sqs.MessageAttributeValue) for key, val := range headers { @@ -71,7 +69,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre // Add request ID if currentRequestID != nil { - msgAttrs[headerKey] = &sqs.MessageAttributeValue{ + msgAttrs[m.RequestIDHeaderKey] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(*currentRequestID), } @@ -84,7 +82,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre var res *sqs.SendMessageOutput var err error - if msgGrpID == "" { + if string_utils.UnwrapString(m.MessageGroupID) == "" { res, err = sqsClient.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), @@ -95,7 +93,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre MessageAttributes: msgAttrs, MessageBody: aws.String(body), QueueUrl: &m.QueueURL, - MessageGroupId: &msgGrpID, + MessageGroupId: m.MessageGroupID, }) } @@ -106,12 +104,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre return *res.MessageId, err } -func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error { - msgGrpID := "" - if len(messageGroupID) > 0 && messageGroupID[0] != "" { - msgGrpID = messageGroupID[0] - } - +func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string) error { if sqsClient == nil { var err error sqsClient, err = NewSQSClient(msgr.Region) @@ -145,7 +138,7 @@ func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID * } } - msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType, headerKey, msgGrpID) + msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err