From 03ad9f93ad272c51a766d74ac9db985691768a22 Mon Sep 17 00:00:00 2001 From: Johan de Klerk <jdeklerk00@gmail.com> Date: Tue, 8 Feb 2022 14:57:03 +0200 Subject: [PATCH] Upload big sqs messages to s3 --- sqs/sqs.go | 107 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 24 deletions(-) diff --git a/sqs/sqs.go b/sqs/sqs.go index 1260de1..0330fff 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -3,9 +3,13 @@ package sqs /*Package sqs provides a simple interface to send messages to AWS SQS*/ import ( + "encoding/binary" "encoding/json" "fmt" - "os" + "github.com/google/uuid" + "gitlab.com/uafrica/go-utils/s3" + "io/ioutil" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -13,18 +17,24 @@ import ( "gitlab.com/uafrica/go-utils/logs" ) +var sqsClient *sqs.SQS + +const SQSMessageOnS3Key = "message-on-s3" + // Messenger sends an arbitrary message via SQS type Messenger struct { - session *session.Session - service *sqs.SQS - queueURL string + QueueName string + QueueURL string + Region string + S3Session s3.SessionWithHelpers + S3BucketName string } -// NewSQSMessenger constructs a Messenger which sends messages to an SQS queue +// NewSQSClient constructs a Messenger which sends messages to an SQS queue // awsRegion - region that the queue was created // awsQueue - name of the queue // Note: Calling code needs SQS IAM permissions -func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) { +func NewSQSClient(awsRegion string) (*sqs.SQS, error) { // Make an AWS session sess, err := session.NewSessionWithOptions(session.Options{ Config: aws.Config{ @@ -37,13 +47,8 @@ func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) { } // Create SQS service - svc := sqs.New(sess) - - return &Messenger{ - session: sess, - service: svc, - queueURL: queueUrl, - }, nil + sqsClient = sqs.New(sess) + return sqsClient, err } // SendSQSMessage sends a message to the queue associated with the messenger @@ -80,16 +85,16 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre var res *sqs.SendMessageOutput var err error if msgGrpID == "" { - res, err = m.service.SendMessage(&sqs.SendMessageInput{ + res, err = sqsClient.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), - QueueUrl: &m.queueURL, + QueueUrl: &m.QueueURL, }) } else { - res, err = m.service.SendMessage(&sqs.SendMessageInput{ + res, err = sqsClient.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), - QueueUrl: &m.queueURL, + QueueUrl: &m.QueueURL, MessageGroupId: &msgGrpID, }) } @@ -101,17 +106,17 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre return *res.MessageId, err } -func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error { +func SendSQSMessage(msgr *Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error { msgGrpID := "" if len(messageGroupID) > 0 && messageGroupID[0] != "" { msgGrpID = messageGroupID[0] } - if msgr == nil { + if sqsClient == nil { var err error - msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName)) + sqsClient, err = NewSQSClient(msgr.Region) if err != nil { - logs.ErrorWithMsg("Failed to create sqs messenger with envQueueURLName: "+envQueueURLName, err) + logs.ErrorWithMsg("Failed to create sqs client", err) } } @@ -121,13 +126,67 @@ func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, obje return err } - headers := map[string]string{"Name": "dummy"} - msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey, msgGrpID) + message := string(jsonBytes) + headers := map[string]string{ + "Name": "dummy", + SQSMessageOnS3Key: "false", + } + + // If bigger than 200 KB upload message to s3 and send s3 file name to sqs + // The sqs receiver should check the header to see if the message is in the body or on s3 + if binary.Size(message) > 200000 { + headers[SQSMessageOnS3Key] = "true" + id := uuid.New() + filename := fmt.Sprintf("%v-%v", sqsType, id.String()) + + err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, message) + if err != nil { + return err + } + } + + msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType, headerKey, msgGrpID) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err } - logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", envQueueURLName, msgID)) + logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID)) + return nil +} + +func uploadMessageToS3(session s3.SessionWithHelpers, bucket string, name string, object interface{}) error { + messageBytes, err := json.Marshal(object) + if err != nil { + return err + } + + // Upload message + expiry := 24 * 7 * time.Hour // 3 days + _, err = session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{ + ExpiryDuration: &expiry, + }) + if err != nil { + return err + } + return nil } + +func RetrieveMessageFromS3(session s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) { + // get the file contents + rawObject, err := session.GetObject(bucket, filename, false) + if err != nil { + return []byte{}, err + } + + // Read the message + var bodyBytes []byte + bodyBytes, err = ioutil.ReadAll(rawObject.Body) + if err != nil { + logs.ErrorWithMsg("Could not read file", err) + return []byte{}, err + } + + return bodyBytes, nil +} -- GitLab