diff --git a/handler_utils/sqs.go b/handler_utils/sqs.go index 5a923583467111281331495c8d24d19b02eba10a..855bf1fcd1052f029402ec002c2cf12f96570288 100644 --- a/handler_utils/sqs.go +++ b/handler_utils/sqs.go @@ -5,6 +5,8 @@ import ( "github.com/aws/aws-lambda-go/events" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" + "gitlab.com/uafrica/go-utils/s3" + "gitlab.com/uafrica/go-utils/sqs" "reflect" ) @@ -30,11 +32,29 @@ func ValidateSQSEndpoints(endpoints map[string]interface{}) (map[string]interfac return endpoints, nil } -func GetRecord(message events.SQSMessage, recordType reflect.Type) (interface{}, error) { +func GetRecord(s3Session *s3.SessionWithHelpers, bucket string, message events.SQSMessage, recordType reflect.Type) (interface{}, error) { + recordValuePtr := reflect.New(recordType) - err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface()) - if err != nil { - return nil, errors.Wrapf(err, "failed to JSON decode message body") + + // Check if message body should be retrieved from S3 + if messageAttribute, ok := message.MessageAttributes[sqs.SQSMessageOnS3Key]; ok { + if messageAttribute.StringValue != nil && *messageAttribute.StringValue == "true" { + messageBytes, err := sqs.RetrieveMessageFromS3(s3Session, bucket, message.Body) + if err != nil { + return nil, errors.Wrapf(err, "failed to get sqs message body from s3") + } + + err = json.Unmarshal(messageBytes, recordValuePtr.Interface()) + if err != nil { + return nil, errors.Wrapf(err, "failed to JSON decode message body") + } + } + } else { + // Message was small enough, it is contained in the message body + err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface()) + if err != nil { + return nil, errors.Wrapf(err, "failed to JSON decode message body") + } } if validator, ok := recordValuePtr.Interface().(IValidator); ok {