Skip to content
Snippets Groups Projects

Resolve "Upload large SQS messages to S3"

Merged Johan de Klerk requested to merge 24-upload-large-sqs-messages-to-s3 into main
2 files
+ 110
42
Compare changes
  • Side-by-side
  • Inline

Files

+ 24
4
@@ -5,6 +5,8 @@ import (
"github.com/aws/aws-lambda-go/events"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logs"
"gitlab.com/uafrica/go-utils/s3"
"gitlab.com/uafrica/go-utils/sqs"
"reflect"
)
@@ -30,11 +32,29 @@ func ValidateSQSEndpoints(endpoints map[string]interface{}) (map[string]interfac
return endpoints, nil
}
func GetRecord(message events.SQSMessage, recordType reflect.Type) (interface{}, error) {
func GetRecord(s3Session *s3.SessionWithHelpers, bucket string, message events.SQSMessage, recordType reflect.Type) (interface{}, error) {
recordValuePtr := reflect.New(recordType)
err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface())
if err != nil {
return nil, errors.Wrapf(err, "failed to JSON decode message body")
// Check if message body should be retrieved from S3
if messageAttribute, ok := message.MessageAttributes[sqs.SQSMessageOnS3Key]; ok {
if messageAttribute.StringValue != nil && *messageAttribute.StringValue == "true" {
messageBytes, err := sqs.RetrieveMessageFromS3(s3Session, bucket, message.Body)
if err != nil {
return nil, errors.Wrapf(err, "failed to get sqs message body from s3")
}
err = json.Unmarshal(messageBytes, recordValuePtr.Interface())
if err != nil {
return nil, errors.Wrapf(err, "failed to JSON decode message body")
}
}
} else {
// Message was small enough, it is contained in the message body
err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface())
if err != nil {
return nil, errors.Wrapf(err, "failed to JSON decode message body")
}
}
if validator, ok := recordValuePtr.Interface().(IValidator); ok {
Loading