package sqs /*Package sqs provides a simple interface to send messages to AWS SQS*/ import ( "encoding/json" "fmt" "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "gitlab.com/uafrica/go-utils/logs" ) // Messenger sends an arbitrary message via SQS type Messenger struct { session *session.Session service *sqs.SQS queueURL string } // NewSQSMessenger constructs a Messenger which sends messages to an SQS queue // awsRegion - region that the queue was created // awsQueue - name of the queue // Note: Calling code needs SQS IAM permissions func NewSQSMessenger(awsRegion, queueUrl string) (*Messenger, error) { // Make an AWS session sess, err := session.NewSessionWithOptions(session.Options{ Config: aws.Config{ Region: aws.String(awsRegion), }, }) if err != nil { return nil, err } // Create SQS service svc := sqs.New(sess) return &Messenger{ session: sess, service: svc, queueURL: queueUrl, }, nil } // SendSQSMessage sends a message to the queue associated with the messenger // headers - string message attributes of the SQS message (see AWS SQS documentation) // body - body of the SQS message (see AWS SQS documentation) func (m *Messenger) SendSQSMessage(headers map[string]string, body string, currentRequestID *string, sqsType string, headerKey string) (string, error) { msgAttrs := make(map[string]*sqs.MessageAttributeValue) for key, val := range headers { msgAttrs[key] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(val), } } // Add request ID if currentRequestID != nil { msgAttrs[headerKey] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(*currentRequestID), } } msgAttrs["type"] = &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(sqsType), } res, err := m.service.SendMessage(&sqs.SendMessageInput{ MessageAttributes: msgAttrs, MessageBody: aws.String(body), QueueUrl: &m.queueURL, }) if err != nil { return "", err } return *res.MessageId, err } func SendSQSMessage(msgr *Messenger, region string, envQueueURLName string, objectToSend interface{}, currentRequestID *string, sqsType string, headerKey string) error { if msgr == nil { var err error msgr, err = NewSQSMessenger(region, os.Getenv(envQueueURLName)) if err != nil { logs.ErrorWithMsg("Failed to create sqs messenger with envQueueURLName: "+envQueueURLName, err) } } jsonBytes, err := json.Marshal(objectToSend) if err != nil { logs.ErrorWithMsg("Failed to encode sqs event data", err) return err } headers := map[string]string{"Name": "dummy"} msgID, err := msgr.SendSQSMessage(headers, string(jsonBytes), currentRequestID, sqsType, headerKey) if err != nil { logs.ErrorWithMsg("Failed to send sqs event", err) return err } logs.Info(fmt.Sprintf("Sent SQS message to %s with ID %s", envQueueURLName, msgID)) return nil }