Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
bobgroup-go-utils
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Bob Public Utils
bobgroup-go-utils
Merge requests
!19
Resolve "Upload large SQS messages to S3"
Code
Review changes
Check out branch
Download
Patches
Plain diff
Expand sidebar
Merged
Resolve "Upload large SQS messages to S3"
24-upload-large-sqs-messages-to-s3
into
main
Overview
0
Commits
9
Pipelines
0
Changes
2
Merged
Johan de Klerk
requested to merge
24-upload-large-sqs-messages-to-s3
into
main
3 years ago
Overview
0
Commits
9
Pipelines
0
Changes
2
Related to
#24 (closed)
Edited
3 years ago
by
Johan de Klerk
0
0
Merge request reports
Compare
main
main (base)
and
latest version
latest version
e4652473
9 commits,
3 years ago
2 files
+
110
−
42
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
2
handler_utils/sqs.go
+
24
−
4
View file @ e4652473
Edit in single-file editor
Open in Web IDE
Show full file
@@ -5,6 +5,8 @@ import (
"github.com/aws/aws-lambda-go/events"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logs"
"gitlab.com/uafrica/go-utils/s3"
"gitlab.com/uafrica/go-utils/sqs"
"reflect"
)
@@ -30,11 +32,29 @@ func ValidateSQSEndpoints(endpoints map[string]interface{}) (map[string]interfac
return
endpoints
,
nil
}
func
GetRecord
(
message
events
.
SQSMessage
,
recordType
reflect
.
Type
)
(
interface
{},
error
)
{
func
GetRecord
(
s3Session
*
s3
.
SessionWithHelpers
,
bucket
string
,
message
events
.
SQSMessage
,
recordType
reflect
.
Type
)
(
interface
{},
error
)
{
recordValuePtr
:=
reflect
.
New
(
recordType
)
err
:=
json
.
Unmarshal
([]
byte
(
message
.
Body
),
recordValuePtr
.
Interface
())
if
err
!=
nil
{
return
nil
,
errors
.
Wrapf
(
err
,
"failed to JSON decode message body"
)
// Check if message body should be retrieved from S3
if
messageAttribute
,
ok
:=
message
.
MessageAttributes
[
sqs
.
SQSMessageOnS3Key
];
ok
{
if
messageAttribute
.
StringValue
!=
nil
&&
*
messageAttribute
.
StringValue
==
"true"
{
messageBytes
,
err
:=
sqs
.
RetrieveMessageFromS3
(
s3Session
,
bucket
,
message
.
Body
)
if
err
!=
nil
{
return
nil
,
errors
.
Wrapf
(
err
,
"failed to get sqs message body from s3"
)
}
err
=
json
.
Unmarshal
(
messageBytes
,
recordValuePtr
.
Interface
())
if
err
!=
nil
{
return
nil
,
errors
.
Wrapf
(
err
,
"failed to JSON decode message body"
)
}
}
}
else
{
// Message was small enough, it is contained in the message body
err
:=
json
.
Unmarshal
([]
byte
(
message
.
Body
),
recordValuePtr
.
Interface
())
if
err
!=
nil
{
return
nil
,
errors
.
Wrapf
(
err
,
"failed to JSON decode message body"
)
}
}
if
validator
,
ok
:=
recordValuePtr
.
Interface
()
.
(
IValidator
);
ok
{
Loading