Newer
Older
package sqs
/*Package sqs provides a simple interface to send messages to AWS SQS*/
import (
"encoding/json"
"fmt"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
var sqsClient *sqs.SQS
const SQSMessageOnS3Key = "message-on-s3"
// Messenger sends an arbitrary message via SQS
type Messenger struct {
QueueName string
QueueURL string
Region string
S3Session *s3.SessionWithHelpers
S3BucketName string
MessageGroupID *string
// NewSQSClient constructs a Messenger which sends messages to an SQS queue
// awsRegion - region that the queue was created
// awsQueue - name of the queue
// Note: Calling code needs SQS IAM permissions
func NewSQSClient(awsRegion string) (*sqs.SQS, error) {
// Make an AWS session
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: aws.String(awsRegion),
},
})
if err != nil {
return nil, err
}
// Create SQS service
sqsClient = sqs.New(sess)
return sqsClient, err
}
// SendSQSMessage sends a message to the queue associated with the messenger
// headers - string message attributes of the SQS message (see AWS SQS documentation)
// body - body of the SQS message (see AWS SQS documentation)
func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string) (string, error) {
msgAttrs := make(map[string]*sqs.MessageAttributeValue)
for key, val := range headers {
msgAttrs[key] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(val),
}
}
// Add request ID
if currentRequestID != nil {
msgAttrs[m.RequestIDHeaderKey] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(*currentRequestID),
}
}
msgAttrs["type"] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(sqsType),
}
// SQS has max of 15 minutes delay
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
if m.DelaySeconds != nil && aws.Int64Value(m.DelaySeconds) > 900 {
m.DelaySeconds = aws.Int64(900)
}
var res *sqs.SendMessageOutput
var err error
res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.QueueURL,
MessageGroupId: m.MessageGroupID,
DelaySeconds: m.DelaySeconds,
})
if err != nil {
return "", err
}
return *res.MessageId, err
}
func SendSQSMessage(msgr Messenger, objectToSend interface{}, currentRequestID *string, sqsType string) error {
sqsClient, err = NewSQSClient(msgr.Region)
logs.ErrorWithMsg("Failed to create sqs client", err)
}
}
jsonBytes, err := json.Marshal(objectToSend)
if err != nil {
logs.ErrorWithMsg("Failed to encode sqs event data", err)
return err
}
message := string(jsonBytes)
headers := map[string]string{
"Name": "dummy",
SQSMessageOnS3Key: "false",
}
// If bigger than 200 KB upload message to s3 and send s3 file name to sqs
// The sqs receiver should check the header to see if the message is in the body or on s3
headers[SQSMessageOnS3Key] = "true"
id := uuid.New()
filename := fmt.Sprintf("%v-%v", sqsType, id.String())
logs.Info("SQS message too big, saving to S3 with filename = %s", filename)
err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, jsonBytes)
message = filename // Send filename as the message
msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType)
if err != nil {
logs.ErrorWithMsg("Failed to send sqs event", err)
return err
}
logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID))
return nil
}
func uploadMessageToS3(session *s3.SessionWithHelpers, bucket string, name string, messageBytes []byte) error {
// Upload message
expiry := 24 * 7 * time.Hour // 3 days
_, err := session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{
ExpiryDuration: &expiry,
})
if err != nil {
return err
}
func RetrieveMessageFromS3(session *s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) {
// get the file contents
rawObject, err := session.GetObject(bucket, filename, false)
if err != nil {
return []byte{}, err
}
// Read the message
var bodyBytes []byte
bodyBytes, err = ioutil.ReadAll(rawObject.Body)
if err != nil {
logs.ErrorWithMsg("Could not read file", err)
return []byte{}, err
}
return bodyBytes, nil
}