From e8be5515c8a200d76aad274358bed0b773f6761a Mon Sep 17 00:00:00 2001 From: Johan de Klerk <jdeklerk00@gmail.com> Date: Wed, 9 Feb 2022 09:31:26 +0200 Subject: [PATCH] Revert "Upload big sqs messages to s3" This reverts commit 03ad9f93ad272c51a766d74ac9db985691768a22. --- sqs/sqs.go | 107 ++++++++++++----------------------------------------- 1 file changed, 24 insertions(+), 83 deletions(-) diff --git a/sqs/sqs.go b/sqs/sqs.go index 0330fff..1260de1 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -3,13 +3,9 @@ package sqs /*Package sqs provides a simple interface to send messages to AWS SQS*/ import ( - "encoding/binary" "encoding/json" "fmt" - "github.com/google/uuid" - "gitlab.com/uafrica/go-utils/s3" - "io/ioutil" - "time" + "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -17,24 +13,18 @@ import ( "gitlab.com/uafrica/go-utils/logs" ) -var sqsClient *sqs.SQS - -const SQSMessageOnS3Key = "message-on-s3" - // Messenger sends an arbitrary message via SQS type Messenger struct { - QueueName string - QueueURL string - Region string - S3Session s3.SessionWithHelpers - S3BucketName string + session *session.Session + service *sqs.SQS + queueURL string } -// NewSQSClient constructs a Messenger which sends messages to an SQS queue +// NewSQSMessenger constructs a Messenger which sends messages to an SQS queue // awsRegion - region that the queue was created // awsQueue - name of the queue // Note: Calling code needs SQS IAM permissions -func NewSQSClient(awsRegion string) (*sqs.SQS, error) { +func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) { // Make an AWS session sess, err := session.NewSessionWithOptions(session.Options{ Config: aws.Config{ @@ -47,8 +37,13 @@ func NewSQSClient(awsRegion string) (*sqs.SQS, error) { } // Create SQS service - sqsClient = sqs.New(sess) - return sqsClient, err + svc := sqs.New(sess) + + return &Messenger{ + session: sess, + service: svc, + queueURL: queueUrl, + }, nil } // SendSQSMessage sends a message to the queue associated with the messenger @@ -85,16 +80,16 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre var res *sqs.SendMessageOutput var err error if msgGrpID == "" { - res, err = sqsClient.SendMessage(&sqs.SendMessageInput{ + res, err = m.service.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), - QueueUrl: &m.QueueURL, + QueueUrl: &m.queueURL, }) } else { - res, err = sqsClient.SendMessage(&sqs.SendMessageInput{ + res, err = m.service.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), - QueueUrl: &m.QueueURL, + QueueUrl: &m.queueURL, MessageGroupId: &msgGrpID, }) } @@ -106,17 +101,17 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre return *res.MessageId, err } -func SendSQSMessage(msgr *Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error { +func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error { msgGrpID := "" if len(messageGroupID) > 0 && messageGroupID[0] != "" { msgGrpID = messageGroupID[0] } - if sqsClient == nil { + if msgr == nil { var err error - sqsClient, err = NewSQSClient(msgr.Region) + msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName)) if err != nil { - logs.ErrorWithMsg("Failed to create sqs client", err) + logs.ErrorWithMsg("Failed to create sqs messenger with envQueueURLName: "+envQueueURLName, err) } } @@ -126,67 +121,13 @@ func SendSQSMessage(msgr *Messenger, objectToSend interface{}, currentRequestID return err } - message := string(jsonBytes) - headers := map[string]string{ - "Name": "dummy", - SQSMessageOnS3Key: "false", - } - - // If bigger than 200 KB upload message to s3 and send s3 file name to sqs - // The sqs receiver should check the header to see if the message is in the body or on s3 - if binary.Size(message) > 200000 { - headers[SQSMessageOnS3Key] = "true" - id := uuid.New() - filename := fmt.Sprintf("%v-%v", sqsType, id.String()) - - err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, message) - if err != nil { - return err - } - } - - msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType, headerKey, msgGrpID) + headers := map[string]string{"Name": "dummy"} + msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey, msgGrpID) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err } - logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID)) - return nil -} - -func uploadMessageToS3(session s3.SessionWithHelpers, bucket string, name string, object interface{}) error { - messageBytes, err := json.Marshal(object) - if err != nil { - return err - } - - // Upload message - expiry := 24 * 7 * time.Hour // 3 days - _, err = session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{ - ExpiryDuration: &expiry, - }) - if err != nil { - return err - } - + logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", envQueueURLName, msgID)) return nil } - -func RetrieveMessageFromS3(session s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) { - // get the file contents - rawObject, err := session.GetObject(bucket, filename, false) - if err != nil { - return []byte{}, err - } - - // Read the message - var bodyBytes []byte - bodyBytes, err = ioutil.ReadAll(rawObject.Body) - if err != nil { - logs.ErrorWithMsg("Could not read file", err) - return []byte{}, err - } - - return bodyBytes, nil -} -- GitLab