diff --git a/sqs/sqs.go b/sqs/sqs.go index 6775da070c7931ed676c8a77ec6cd5c3892ed98e..0e19dc78969fd87d90acd871ab446994aff4947d 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -49,7 +49,7 @@ func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) { // SendSQSMessage sends a message to the queue associated with the messenger // headers - string message attributes of the SQS message (see AWS SQS documentation) // body - body of the SQS message (see AWS SQS documentation) -func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, headerKey string) (string, error) { +func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string) (string, error) { msgAttrs := make(map[string]*sqs.MessageAttributeValue) for key, val := range headers { @@ -67,6 +67,11 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre } } + msgAttrs["type"] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(sqsType), + } + res, err := m.service.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), @@ -80,7 +85,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre return *res.MessageId, err } -func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, headerKey string) error { +func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string) error { if msgr == nil { var err error msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName)) @@ -96,7 +101,7 @@ func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, obje } headers := map[string]string{"Name": "dummy"} - msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, headerKey) + msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err