Skip to content
Snippets Groups Projects
Commit 03ad9f93 authored by Johan de Klerk's avatar Johan de Klerk
Browse files

Upload big sqs messages to s3

parent fd2c31c5
No related branches found
No related tags found
No related merge requests found
......@@ -3,9 +3,13 @@ package sqs
/*Package sqs provides a simple interface to send messages to AWS SQS*/
import (
"encoding/binary"
"encoding/json"
"fmt"
"os"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/s3"
"io/ioutil"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
......@@ -13,18 +17,24 @@ import (
"gitlab.com/uafrica/go-utils/logs"
)
var sqsClient *sqs.SQS
const SQSMessageOnS3Key = "message-on-s3"
// Messenger sends an arbitrary message via SQS
type Messenger struct {
session *session.Session
service *sqs.SQS
queueURL string
QueueName string
QueueURL string
Region string
S3Session s3.SessionWithHelpers
S3BucketName string
}
// NewSQSMessenger constructs a Messenger which sends messages to an SQS queue
// NewSQSClient constructs a Messenger which sends messages to an SQS queue
// awsRegion - region that the queue was created
// awsQueue - name of the queue
// Note: Calling code needs SQS IAM permissions
func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) {
func NewSQSClient(awsRegion string) (*sqs.SQS, error) {
// Make an AWS session
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
......@@ -37,13 +47,8 @@ func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) {
}
// Create SQS service
svc := sqs.New(sess)
return &Messenger{
session: sess,
service: svc,
queueURL: queueUrl,
}, nil
sqsClient = sqs.New(sess)
return sqsClient, err
}
// SendSQSMessage sends a message to the queue associated with the messenger
......@@ -80,16 +85,16 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
var res *sqs.SendMessageOutput
var err error
if msgGrpID == "" {
res, err = m.service.SendMessage(&sqs.SendMessageInput{
res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.queueURL,
QueueUrl: &m.QueueURL,
})
} else {
res, err = m.service.SendMessage(&sqs.SendMessageInput{
res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.queueURL,
QueueUrl: &m.QueueURL,
MessageGroupId: &msgGrpID,
})
}
......@@ -101,17 +106,17 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
return *res.MessageId, err
}
func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error {
func SendSQSMessage(msgr *Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error {
msgGrpID := ""
if len(messageGroupID) > 0 && messageGroupID[0] != "" {
msgGrpID = messageGroupID[0]
}
if msgr == nil {
if sqsClient == nil {
var err error
msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName))
sqsClient, err = NewSQSClient(msgr.Region)
if err != nil {
logs.ErrorWithMsg("Failed to create sqs messenger with envQueueURLName: "+envQueueURLName, err)
logs.ErrorWithMsg("Failed to create sqs client", err)
}
}
......@@ -121,13 +126,67 @@ func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, obje
return err
}
headers := map[string]string{"Name": "dummy"}
msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey, msgGrpID)
message := string(jsonBytes)
headers := map[string]string{
"Name": "dummy",
SQSMessageOnS3Key: "false",
}
// If bigger than 200 KB upload message to s3 and send s3 file name to sqs
// The sqs receiver should check the header to see if the message is in the body or on s3
if binary.Size(message) > 200000 {
headers[SQSMessageOnS3Key] = "true"
id := uuid.New()
filename := fmt.Sprintf("%v-%v", sqsType, id.String())
err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, message)
if err != nil {
return err
}
}
msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType, headerKey, msgGrpID)
if err != nil {
logs.ErrorWithMsg("Failed to send sqs event", err)
return err
}
logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", envQueueURLName, msgID))
logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID))
return nil
}
func uploadMessageToS3(session s3.SessionWithHelpers, bucket string, name string, object interface{}) error {
messageBytes, err := json.Marshal(object)
if err != nil {
return err
}
// Upload message
expiry := 24 * 7 * time.Hour // 3 days
_, err = session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{
ExpiryDuration: &expiry,
})
if err != nil {
return err
}
return nil
}
func RetrieveMessageFromS3(session s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) {
// get the file contents
rawObject, err := session.GetObject(bucket, filename, false)
if err != nil {
return []byte{}, err
}
// Read the message
var bodyBytes []byte
bodyBytes, err = ioutil.ReadAll(rawObject.Body)
if err != nil {
logs.ErrorWithMsg("Could not read file", err)
return []byte{}, err
}
return bodyBytes, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment