Skip to content
Snippets Groups Projects
Select Git revision
  • 5222ed1d4aab19a8412c970619dcdca433f74462
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

sqs.go

Blame
  • sqs.go 4.76 KiB
    package sqs
    
    /*Package sqs provides a simple interface to send messages to AWS SQS*/
    
    import (
    	"encoding/binary"
    	"encoding/json"
    	"fmt"
    	"github.com/google/uuid"
    	"gitlab.com/uafrica/go-utils/s3"
    	"gitlab.com/uafrica/go-utils/string_utils"
    	"io/ioutil"
    	"time"
    
    	"github.com/aws/aws-sdk-go/aws"
    	"github.com/aws/aws-sdk-go/aws/session"
    	"github.com/aws/aws-sdk-go/service/sqs"
    	"gitlab.com/uafrica/go-utils/logs"
    )
    
    var sqsClient *sqs.SQS
    
    const SQSMessageOnS3Key = "message-on-s3"
    
    // Messenger sends an arbitrary message via SQS
    type Messenger struct {
    	QueueName          string
    	QueueURL           string
    	Region             string
    	S3Session          *s3.SessionWithHelpers
    	S3BucketName       string
    	MessageGroupID     *string
    	RequestIDHeaderKey string
    }
    
    // NewSQSClient constructs a Messenger which sends messages to an SQS queue
    // awsRegion - region that the queue was created
    // awsQueue - name of the queue
    // Note: Calling code needs SQS IAM permissions
    func NewSQSClient(awsRegion string) (*sqs.SQS, error) {
    	// Make an AWS session
    	sess, err := session.NewSessionWithOptions(session.Options{
    		Config: aws.Config{
    			Region: aws.String(awsRegion),
    		},
    	})
    
    	if err != nil {
    		return nil, err
    	}
    
    	// Create SQS service
    	sqsClient = sqs.New(sess)
    	return sqsClient, err
    }
    
    // SendSQSMessage sends a message to the queue associated with the messenger
    // headers - string message attributes of the SQS message (see AWS SQS documentation)
    // body - body of the SQS message (see AWS SQS documentation)
    func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) {
    	msgAttrs := make(map[string]*sqs.MessageAttributeValue)
    
    	for key, val := range headers {
    		msgAttrs[key] = &sqs.MessageAttributeValue{
    			DataType:    aws.String("String"),
    			StringValue: aws.String(val),
    		}
    	}
    
    	// Add request ID
    	if currentRequestID != nil {
    		msgAttrs[m.RequestIDHeaderKey] = &sqs.MessageAttributeValue{
    			DataType:    aws.String("String"),
    			StringValue: aws.String(*currentRequestID),
    		}
    	}
    
    	msgAttrs["type"] = &sqs.MessageAttributeValue{
    		DataType:    aws.String("String"),
    		StringValue: aws.String(sqsType),
    	}
    
    	var res *sqs.SendMessageOutput
    	var err error
    	if string_utils.UnwrapString(m.MessageGroupID) == "" {
    		res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
    			MessageAttributes: msgAttrs,
    			MessageBody:       aws.String(body),
    			QueueUrl:          &m.QueueURL,
    		})
    	} else {
    		res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
    			MessageAttributes: msgAttrs,
    			MessageBody:       aws.String(body),
    			QueueUrl:          &m.QueueURL,
    			MessageGroupId:    m.MessageGroupID,
    		})
    	}
    
    	if err != nil {
    		return "", err
    	}
    
    	return *res.MessageId, err
    }
    
    func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string) error {
    	if sqsClient == nil {
    		var err error
    		sqsClient, err = NewSQSClient(msgr.Region)
    		if err != nil {
    			logs.ErrorWithMsg("Failed to create sqs client", err)
    		}
    	}
    
    	jsonBytes, err := json.Marshal(objectToSend)
    	if err != nil {
    		logs.ErrorWithMsg("Failed to encode sqs event data", err)
    		return err
    	}
    
    	message := string(jsonBytes)
    	headers := map[string]string{
    		"Name":            "dummy",
    		SQSMessageOnS3Key: "false",
    	}
    
    	// If bigger than 200 KB upload message to s3 and send s3 file name to sqs
    	// The sqs receiver should check the header to see if the message is in the body or on s3
    	if binary.Size(message) > 0 {
    		headers[SQSMessageOnS3Key] = "true"
    		id := uuid.New()
    		filename := fmt.Sprintf("%v-%v", sqsType, id.String())
    
    		err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, message)
    		if err != nil {
    			return err
    		}
    	}
    
    	msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType)
    	if err != nil {
    		logs.ErrorWithMsg("Failed to send sqs event", err)
    		return err
    	}
    
    	logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID))
    	return nil
    }
    
    func uploadMessageToS3(session *s3.SessionWithHelpers, bucket string, name string, object interface{}) error {
    	messageBytes, err := json.Marshal(object)
    	if err != nil {
    		return err
    	}
    
    	// Upload message
    	expiry := 24 * 7 * time.Hour // 3 days
    	_, err = session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{
    		ExpiryDuration: &expiry,
    	})
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    func RetrieveMessageFromS3(session *s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) {
    	// get the file contents
    	rawObject, err := session.GetObject(bucket, filename, false)
    	if err != nil {
    		return []byte{}, err
    	}
    
    	// Read the message
    	var bodyBytes []byte
    	bodyBytes, err = ioutil.ReadAll(rawObject.Body)
    	if err != nil {
    		logs.ErrorWithMsg("Could not read file", err)
    		return []byte{}, err
    	}
    
    	return bodyBytes, nil
    }