Skip to content
Snippets Groups Projects
Commit a590e76d authored by Johan de Klerk's avatar Johan de Klerk
Browse files

#24: Added more info to SQS messenger

parent 88de2e49
Branches
Tags
1 merge request!19Resolve "Upload large SQS messages to S3"
......@@ -8,6 +8,7 @@ import (
"fmt"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/s3"
"gitlab.com/uafrica/go-utils/string_utils"
"io/ioutil"
"time"
......@@ -28,6 +29,8 @@ type Messenger struct {
Region string
S3Session *s3.SessionWithHelpers
S3BucketName string
MessageGroupID *string
RequestIDHeaderKey string
}
// NewSQSClient constructs a Messenger which sends messages to an SQS queue
......@@ -54,12 +57,7 @@ func NewSQSClient(awsRegion string) (*sqs.SQS, error) {
// SendSQSMessage sends a message to the queue associated with the messenger
// headers - string message attributes of the SQS message (see AWS SQS documentation)
// body - body of the SQS message (see AWS SQS documentation)
func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) (string, error) {
msgGrpID := ""
if len(messageGroupID) > 0 && messageGroupID[0] != "" {
msgGrpID = messageGroupID[0]
}
func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) {
msgAttrs := make(map[string]*sqs.MessageAttributeValue)
for key, val := range headers {
......@@ -71,7 +69,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
// Add request ID
if currentRequestID != nil {
msgAttrs[headerKey] = &sqs.MessageAttributeValue{
msgAttrs[m.RequestIDHeaderKey] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(*currentRequestID),
}
......@@ -84,7 +82,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
var res *sqs.SendMessageOutput
var err error
if msgGrpID == "" {
if string_utils.UnwrapString(m.MessageGroupID) == "" {
res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
......@@ -95,7 +93,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.QueueURL,
MessageGroupId: &msgGrpID,
MessageGroupId: m.MessageGroupID,
})
}
......@@ -106,12 +104,7 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
return *res.MessageId, err
}
func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error {
msgGrpID := ""
if len(messageGroupID) > 0 && messageGroupID[0] != "" {
msgGrpID = messageGroupID[0]
}
func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string) error {
if sqsClient == nil {
var err error
sqsClient, err = NewSQSClient(msgr.Region)
......@@ -145,7 +138,7 @@ func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *
}
}
msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType, headerKey, msgGrpID)
msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType)
if err != nil {
logs.ErrorWithMsg("Failed to send sqs event", err)
return err
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment