Select Git revision
producer.go
producer.go 3.40 KiB
package sqs
import (
"os"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/queues"
)
func NewProducer(requestIDHeaderKey string) queues.IProducer {
region := os.Getenv("AWS_REGION")
if region == "" {
panic(errors.Errorf("environment AWS_REGION is not defined"))
}
if requestIDHeaderKey == "" {
requestIDHeaderKey = "request-id"
}
return &producer{
region: region,
requestIDHeaderKey: requestIDHeaderKey,
session: nil,
queues: map[string]*Messenger{},
}
}
type producer struct {
sync.Mutex
region string
requestIDHeaderKey string
session *session.Session
queues map[string]*Messenger
}
// Note: Calling code needs SQS IAM permissions
func (producer *producer) Send(event queues.Event) (string, error) {
messenger, ok := producer.queues[event.QueueName]
if !ok {
producer.Lock()
defer producer.Unlock()
messenger, ok = producer.queues[event.QueueName]
if !ok {
envName := strings.ToUpper(event.QueueName + "_QUEUE_URL")
queueURL := os.Getenv(envName)
if queueURL == "" {
return "", errors.Errorf("cannot send to queue(%s) because environment(%s) is undefined", event.QueueName, envName)
}
// Make an AWS session
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: aws.String(producer.region),
},
})
if err != nil {
return "", errors.Wrapf(err, "failed to create AWS session")
}
messenger = &Messenger{
session: sess,
service: sqs.New(sess),
queueURL: queueURL,
}
producer.queues[event.QueueName] = messenger