package sqs /*Package sqs provides a simple interface to send messages to AWS SQS*/ import ( "context" "encoding/json" "fmt" "io" "sync" "time" "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/utils" "github.com/google/uuid" "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/go-resty/resty/v2" "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs" ) var sqsClient *sqs.Client const SQSMessageOnS3Key = "message-on-s3" // Messenger sends an arbitrary message via SQS type Messenger struct { QueueName string QueueURL string Region string S3Client *s3.ClientWithHelpers S3BucketName string MessageGroupID *string DelaySeconds *int64 RequestIDHeaderKey string } // NewSQSClient constructs a Messenger which sends messages to an SQS queue // awsRegion - region that the queue was created // awsQueue - name of the queue // Note: Calling code needs SQS IAM permissions func NewSQSClient(awsRegion string) (*sqs.Client, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(awsRegion)) if err != nil { return nil, err } sqsClient := sqs.NewFromConfig(cfg) return sqsClient, err } // SendSQSMessage sends a message to the queue associated with the messenger // headers - string message attributes of the SQS message (see AWS SQS documentation) // body - body of the SQS message (see AWS SQS documentation) func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) { msgAttrs := make(map[string]types.MessageAttributeValue) for key, val := range headers { msgAttrs[key] = types.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(val), } } // Add request ID if currentRequestID != nil { msgAttrs[m.RequestIDHeaderKey] = types.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(*currentRequestID), } } msgAttrs["type"] = types.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(sqsType), } // SQS has max of 15 minutes delay // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html var delaySeconds int32 if m.DelaySeconds != nil { if *m.DelaySeconds > 900 { m.DelaySeconds = utils.ValueToPointer(int64(900)) } delaySeconds = int32(*m.DelaySeconds) } var res *sqs.SendMessageOutput var err error res, err = sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), QueueUrl: &m.QueueURL, MessageGroupId: m.MessageGroupID, DelaySeconds: delaySeconds, }) if err != nil { return "", err } return *res.MessageId, err } func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, isDebug bool) error { if isDebug { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if msgr.DelaySeconds != nil { delay := *msgr.DelaySeconds time.Sleep(time.Second * time.Duration(delay)) } resty.New().R(). SetBody(objectToSend). Post("http://127.0.0.1:3000/sqs/" + sqsType) }() time.Sleep(time.Second * 1) wg.Wait() return nil } if sqsClient == nil { var err error sqsClient, err = NewSQSClient(msgr.Region) if err != nil { logs.ErrorWithMsg("Failed to create sqs client", err) } } jsonBytes, err := json.Marshal(objectToSend) if err != nil { logs.ErrorWithMsg("Failed to encode sqs event data", err) return err } message := string(jsonBytes) headers := map[string]string{ SQSMessageOnS3Key: "false", } // If bigger than 200 KB upload message to s3 and send s3 file name to sqs // The sqs receiver should check the header to see if the message is in the body or on s3 if len(jsonBytes) > 200000 { headers[SQSMessageOnS3Key] = "true" id := uuid.New() filename := fmt.Sprintf("%v-%v", sqsType, id.String()) logs.Info("SQS S3 %s", filename) err := uploadMessageToS3(msgr.S3Client, msgr.S3BucketName, filename, jsonBytes) if err != nil { return err } message = filename // Send filename as the message } msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err } logs.Info("SQS to %s: %s", msgr.QueueName, msgID) return nil } func uploadMessageToS3(client *s3.ClientWithHelpers, bucket string, name string, messageBytes []byte) error { // Upload message expiry := 24 * 7 * time.Hour // 3 days _, err := client.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{ ExpiryDuration: &expiry, }) if err != nil { return err } return nil } func RetrieveMessageFromS3(client *s3.ClientWithHelpers, bucket string, filename string) ([]byte, error) { // get the file contents rawObject, err := client.GetObject(bucket, filename, false) if err != nil { return []byte{}, err } // Read the message var bodyBytes []byte bodyBytes, err = io.ReadAll(rawObject.Body) if err != nil { logs.ErrorWithMsg("Could not read file", err) return []byte{}, err } return bodyBytes, nil }