Skip to content
Snippets Groups Projects
Commit e8be5515 authored by Johan de Klerk's avatar Johan de Klerk
Browse files

Revert "Upload big sqs messages to s3"

This reverts commit 03ad9f93.
parent 761e6711
No related branches found
No related tags found
No related merge requests found
......@@ -3,13 +3,9 @@ package sqs
/*Package sqs provides a simple interface to send messages to AWS SQS*/
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/s3"
"io/ioutil"
"time"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
......@@ -17,24 +13,18 @@ import (
"gitlab.com/uafrica/go-utils/logs"
)
var sqsClient *sqs.SQS
const SQSMessageOnS3Key = "message-on-s3"
// Messenger sends an arbitrary message via SQS
type Messenger struct {
QueueName string
QueueURL string
Region string
S3Session s3.SessionWithHelpers
S3BucketName string
session *session.Session
service *sqs.SQS
queueURL string
}
// NewSQSClient constructs a Messenger which sends messages to an SQS queue
// NewSQSMessenger constructs a Messenger which sends messages to an SQS queue
// awsRegion - region that the queue was created
// awsQueue - name of the queue
// Note: Calling code needs SQS IAM permissions
func NewSQSClient(awsRegion string) (*sqs.SQS, error) {
func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) {
// Make an AWS session
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
......@@ -47,8 +37,13 @@ func NewSQSClient(awsRegion string) (*sqs.SQS, error) {
}
// Create SQS service
sqsClient = sqs.New(sess)
return sqsClient, err
svc := sqs.New(sess)
return &Messenger{
session: sess,
service: svc,
queueURL: queueUrl,
}, nil
}
// SendSQSMessage sends a message to the queue associated with the messenger
......@@ -85,16 +80,16 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
var res *sqs.SendMessageOutput
var err error
if msgGrpID == "" {
res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
res, err = m.service.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.QueueURL,
QueueUrl: &m.queueURL,
})
} else {
res, err = sqsClient.SendMessage(&sqs.SendMessageInput{
res, err = m.service.SendMessage(&sqs.SendMessageInput{
MessageAttributes: msgAttrs,
MessageBody: aws.String(body),
QueueUrl: &m.QueueURL,
QueueUrl: &m.queueURL,
MessageGroupId: &msgGrpID,
})
}
......@@ -106,17 +101,17 @@ func (m *Messenger) SendSQSMessage(headers map[string]string, body string, curre
return *res.MessageId, err
}
func SendSQSMessage(msgr *Messenger, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error {
func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string, messageGroupID ...string) error {
msgGrpID := ""
if len(messageGroupID) > 0 && messageGroupID[0] != "" {
msgGrpID = messageGroupID[0]
}
if sqsClient == nil {
if msgr == nil {
var err error
sqsClient, err = NewSQSClient(msgr.Region)
msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName))
if err != nil {
logs.ErrorWithMsg("Failed to create sqs client", err)
logs.ErrorWithMsg("Failed to create sqs messenger with envQueueURLName: "+envQueueURLName, err)
}
}
......@@ -126,67 +121,13 @@ func SendSQSMessage(msgr *Messenger, objectToSend interface{}, currentRequestID
return err
}
message := string(jsonBytes)
headers := map[string]string{
"Name": "dummy",
SQSMessageOnS3Key: "false",
}
// If bigger than 200 KB upload message to s3 and send s3 file name to sqs
// The sqs receiver should check the header to see if the message is in the body or on s3
if binary.Size(message) > 200000 {
headers[SQSMessageOnS3Key] = "true"
id := uuid.New()
filename := fmt.Sprintf("%v-%v", sqsType, id.String())
err := uploadMessageToS3(msgr.S3Session, msgr.S3BucketName, filename, message)
if err != nil {
return err
}
}
msgID, err := msgr.SendSQSMessage(headers, message, currentRequestID, sqsType, headerKey, msgGrpID)
headers := map[string]string{"Name": "dummy"}
msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey, msgGrpID)
if err != nil {
logs.ErrorWithMsg("Failed to send sqs event", err)
return err
}
logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", msgr.QueueName, msgID))
logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", envQueueURLName, msgID))
return nil
}
func uploadMessageToS3(session s3.SessionWithHelpers, bucket string, name string, object interface{}) error {
messageBytes, err := json.Marshal(object)
if err != nil {
return err
}
// Upload message
expiry := 24 * 7 * time.Hour // 3 days
_, err = session.UploadWithSettings(messageBytes, bucket, name, s3.S3UploadSettings{
ExpiryDuration: &expiry,
})
if err != nil {
return err
}
return nil
}
func RetrieveMessageFromS3(session s3.SessionWithHelpers, bucket string, filename string) ([]byte, error) {
// get the file contents
rawObject, err := session.GetObject(bucket, filename, false)
if err != nil {
return []byte{}, err
}
// Read the message
var bodyBytes []byte
bodyBytes, err = ioutil.ReadAll(rawObject.Body)
if err != nil {
logs.ErrorWithMsg("Could not read file", err)
return []byte{}, err
}
return bodyBytes, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment