Skip to content
Snippets Groups Projects
sqs.go 5.21 KiB
Newer Older
Francé Wilke's avatar
Francé Wilke committed
package sqs

/*Package sqs provides a simple interface to send messages to AWS SQS*/

import (
Daniel Naude's avatar
Daniel Naude committed
	"context"
Francé Wilke's avatar
Francé Wilke committed
	"encoding/json"
	"fmt"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/utils"
Daniel Naude's avatar
Daniel Naude committed
	"io"
	"time"

	"github.com/google/uuid"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3"
Francé Wilke's avatar
Francé Wilke committed

Daniel Naude's avatar
Daniel Naude committed
	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"github.com/go-resty/resty/v2"
Francé Wilke's avatar
Francé Wilke committed
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
Francé Wilke's avatar
Francé Wilke committed
)

Daniel Naude's avatar
Daniel Naude committed
var sqsClient *sqs.Client

const SQSMessageOnS3Key = "message-on-s3"

Francé Wilke's avatar
Francé Wilke committed
// Messenger sends an arbitrary message via SQS
type Messenger struct {
	QueueName          string
	QueueURL           string
	Region             string
Daniel Naude's avatar
Daniel Naude committed
	S3Client           *s3.ClientWithHelpers
	S3BucketName       string
	MessageGroupID     *string
	DelaySeconds       *int64
	RequestIDHeaderKey string
Francé Wilke's avatar
Francé Wilke committed
}

// NewSQSClient constructs a Messenger which sends messages to an SQS queue
Francé Wilke's avatar
Francé Wilke committed
// awsRegion - region that the queue was created
// awsQueue - name of the queue
// Note: Calling code needs SQS IAM permissions
Daniel Naude's avatar
Daniel Naude committed
func NewSQSClient(awsRegion string) (*sqs.Client, error) {
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(awsRegion))
Francé Wilke's avatar
Francé Wilke committed
	if err != nil {
		return nil, err
	}

Daniel Naude's avatar
Daniel Naude committed
	sqsClient := sqs.NewFromConfig(cfg)

	return sqsClient, err
Francé Wilke's avatar
Francé Wilke committed
}

// SendSQSMessage sends a message to the queue associated with the messenger
// headers - string message attributes of the SQS message (see AWS SQS documentation)
// body - body of the SQS message (see AWS SQS documentation)
func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) {
Daniel Naude's avatar
Daniel Naude committed
	msgAttrs := make(map[string]types.MessageAttributeValue)
Francé Wilke's avatar
Francé Wilke committed

	for key, val := range headers {
Daniel Naude's avatar
Daniel Naude committed
		msgAttrs[key] = types.MessageAttributeValue{
Francé Wilke's avatar
Francé Wilke committed
			DataType:    aws.String("String"),
			StringValue: aws.String(val),
		}
	}

	// Add request ID
	if currentRequestID != nil {
Daniel Naude's avatar
Daniel Naude committed
		msgAttrs[m.RequestIDHeaderKey] = types.MessageAttributeValue{
Francé Wilke's avatar
Francé Wilke committed
			DataType:    aws.String("String"),
			StringValue: aws.String(*currentRequestID),
		}
	}

Daniel Naude's avatar
Daniel Naude committed
	msgAttrs["type"] = types.MessageAttributeValue{
Francé Wilke's avatar
Francé Wilke committed
		DataType:    aws.String("String"),
		StringValue: aws.String(sqsType),
	}

	// SQS has max of 15 minutes delay
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
	var delaySeconds int32
	if m.DelaySeconds != nil {
		if *m.DelaySeconds > 900 {
			m.DelaySeconds = utils.ValueToPointer(int64(900))
		}
		delaySeconds = int32(*m.DelaySeconds)
	var res *sqs.SendMessageOutput
	var err error
Daniel Naude's avatar
Daniel Naude committed
	res, err = sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
		MessageAttributes: msgAttrs,
		MessageBody:       aws.String(body),
		QueueUrl:          &m.QueueURL,
		MessageGroupId:    m.MessageGroupID,
		DelaySeconds:      delaySeconds,
Francé Wilke's avatar
Francé Wilke committed

	if err != nil {
		return "", err
	}

	return *res.MessageId, err
}

func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, isDebug bool) error {

	if isDebug {
		go func() {
			if msgr.DelaySeconds != nil {
				delay := *msgr.DelaySeconds
Francé Wilke's avatar
Francé Wilke committed
				time.Sleep(time.Second * time.Duration(delay))
Johan de Klerk's avatar
Johan de Klerk committed
			}
			resty.New().R().
				SetBody(objectToSend).
				Post("http://127.0.0.1:3000/sqs/" + sqsType)
		}()
Francé Wilke's avatar
Francé Wilke committed
		time.Sleep(time.Second * 1)
	if sqsClient == nil {
Francé Wilke's avatar
Francé Wilke committed
		var err error
		sqsClient, err = NewSQSClient(msgr.Region)
Francé Wilke's avatar
Francé Wilke committed
		if err != nil {
			logs.ErrorWithMsg("Failed to create sqs client", err)
Francé Wilke's avatar
Francé Wilke committed
		}
	}

	jsonBytes, err := json.Marshal(objectToSend)
	if err != nil {
		logs.ErrorWithMsg("Failed to encode sqs event data", err)
		return err
	}

	message := string(jsonBytes)
	headers := map[string]string{
		SQSMessageOnS3Key: "false",
	}

	// If bigger than 200 KB upload message to s3 and send s3 file name to sqs
	// The sqs receiver should check the header to see if the message is in the body or on s3
	if len(jsonBytes) > 200000 {

		headers[SQSMessageOnS3Key] = "true"
		id := uuid.New()
		filename := fmt.Sprintf("%v-%v", sqsType, id.String())
Francé Wilke's avatar
Francé Wilke committed
		logs.Info("SQS S3 %s", filename)
Daniel Naude's avatar
Daniel Naude committed
		err := uploadMessageToS3(msgr.S3Client, msgr.S3BucketName, filename, jsonBytes)
		if err != nil {
			return err
		}

		message = filename // Send filename as the message
	msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType)
Francé Wilke's avatar
Francé Wilke committed
	if err != nil {
		logs.ErrorWithMsg("Failed to send sqs event", err)
		return err
	}

Francé Wilke's avatar
Francé Wilke committed
	logs.Info("SQS to %s: %s", msgr.QueueName, msgID)
Daniel Naude's avatar
Daniel Naude committed
func uploadMessageToS3(client *s3.ClientWithHelpers, bucket string, name string, messageBytes []byte) error {
	// Upload message
	expiry := 24 * 7 * time.Hour // 3 days
Daniel Naude's avatar
Daniel Naude committed
	_, err := client.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{
		ExpiryDuration: &expiry,
	})
	if err != nil {
		return err
	}

Francé Wilke's avatar
Francé Wilke committed
	return nil
}
Daniel Naude's avatar
Daniel Naude committed
func RetrieveMessageFromS3(client *s3.ClientWithHelpers, bucket string, filename string) ([]byte, error) {
	// get the file contents
Daniel Naude's avatar
Daniel Naude committed
	rawObject, err := client.GetObject(bucket, filename, false)
	if err != nil {
		return []byte{}, err
	}

	// Read the message
	var bodyBytes []byte
Daniel Naude's avatar
Daniel Naude committed
	bodyBytes, err = io.ReadAll(rawObject.Body)
	if err != nil {
		logs.ErrorWithMsg("Could not read file", err)
		return []byte{}, err
	}

	return bodyBytes, nil
}