package sqs /*Package sqs provides a simple interface to send messages to AWS SQS*/ import ( "encoding/json" "fmt" "github.com/google/uuid" "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3" "io/ioutil" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs" "github.com/go-resty/resty/v2" ) var sqsClient *sqs.SQS const SQSMessageOnS3Key = "message-on-s3" // Messenger sends an arbitrary message via SQS type Messenger struct { QueueName string QueueURL string Region string S3Session *s3.SessionWithHelpers S3BucketName string MessageGroupID *string DelaySeconds *int64 RequestIDHeaderKey string } // NewSQSClient constructs a Messenger which sends messages to an SQS queue // awsRegion - region that the queue was created // awsQueue - name of the queue // Note: Calling code needs SQS IAM permissions func NewSQSClient(awsRegion string) (*sqs.SQS, error) { // Make an AWS session sess, err := session.NewSessionWithOptions(session.Options{ Config: aws.Config{ Region: aws.String(awsRegion), }, }) if err != nil { return nil, err } // Create SQS service sqsClient = sqs.New(sess) return sqsClient, err } // SendSQSMessage sends a message to the queue associated with the messenger // headers - string message attributes of the SQS message (see AWS SQS documentation) // body - body of the SQS message (see AWS SQS documentation) func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) { msgAttrs := make(map[string]*sqs.MessageAttributeValue) for key, val := range headers { msgAttrs[key] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(val), } } // Add request ID if currentRequestID != nil { msgAttrs[m.RequestIDHeaderKey] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(*currentRequestID), } } msgAttrs["type"] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(sqsType), } // SQS has max of 15 minutes delay // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html if m.DelaySeconds != nil && aws.Int64Value(m.DelaySeconds) > 900 { m.DelaySeconds = aws.Int64(900) } var res *sqs.SendMessageOutput var err error res, err = sqsClient.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), QueueUrl: &m.QueueURL, MessageGroupId: m.MessageGroupID, DelaySeconds: m.DelaySeconds, }) if err != nil { return "", err } return *res.MessageId, err } func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, isDebug bool) error { if isDebug { resty.New().R(). SetBody(objectToSend). Post("http://127.0.0.1:3000/sqs/" + sqsType) return nil } if sqsClient == nil { var err error sqsClient, err = NewSQSClient(msgr.Region) if err != nil { logs.ErrorWithMsg("Failed to create sqs client", err) } } jsonBytes, err := json.Marshal(objectToSend) if err != nil { logs.ErrorWithMsg("Failed to encode sqs event data", err) return err } message := string(jsonBytes) headers := map[string]string{ "Name": "dummy", SQSMessageOnS3Key: "false", } // If bigger than 200 KB upload message to s3 and send s3 file name to sqs // The sqs receiver should check the header to see if the message is in the body or on s3 if len(jsonBytes) > 200000 { headers[SQSMessageOnS3Key] = "true" id := uuid.New() filename := fmt.Sprintf("%v-%v", sqsType, id.String()) logs.Info("SQS message too big, saving to S3 with filename = %s", filename) err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, jsonBytes) if err != nil { return err } message = filename // Send filename as the message } msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err } logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID)) return nil } func uploadMessageToS3(session *s3.SessionWithHelpers, bucket string, name string, messageBytes []byte) error { // Upload message expiry := 24 * 7 * time.Hour // 3 days _, err := session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{ ExpiryDuration: &expiry, }) if err != nil { return err } return nil } func RetrieveMessageFromS3(session *s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) { // get the file contents rawObject, err := session.GetObject(bucket, filename, false) if err != nil { return []byte{}, err } // Read the message var bodyBytes []byte bodyBytes, err = ioutil.ReadAll(rawObject.Body) if err != nil { logs.ErrorWithMsg("Could not read file", err) return []byte{}, err } return bodyBytes, nil }