Skip to content
Snippets Groups Projects
Select Git revision
  • 90ec162f4b59c68672a981f51916632a94e95b1c
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

producer.go

Blame
  • producer.go 3.40 KiB
    package sqs
    
    import (
    	"os"
    	"strings"
    	"sync"
    	"time"
    
    	"github.com/aws/aws-sdk-go/aws"
    	"github.com/aws/aws-sdk-go/aws/session"
    	"github.com/aws/aws-sdk-go/service/sqs"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/logger"
    	"gitlab.com/uafrica/go-utils/queues"
    )
    
    func NewProducer(requestIDHeaderKey string) queues.IProducer {
    	region := os.Getenv("AWS_REGION")
    	if region == "" {
    		panic(errors.Errorf("environment AWS_REGION is not defined"))
    	}
    	if requestIDHeaderKey == "" {
    		requestIDHeaderKey = "request-id"
    	}
    	return &producer{
    		region:             region,
    		requestIDHeaderKey: requestIDHeaderKey,
    		session:            nil,
    		queues:             map[string]*Messenger{},
    	}
    }
    
    type producer struct {
    	sync.Mutex
    	region             string
    	requestIDHeaderKey string
    	session            *session.Session
    	queues             map[string]*Messenger
    }
    
    // Note: Calling code needs SQS IAM permissions
    func (producer *producer) Send(event queues.Event) (string, error) {
    	messenger, ok := producer.queues[event.QueueName]
    	if !ok {
    		producer.Lock()
    		defer producer.Unlock()
    		messenger, ok = producer.queues[event.QueueName]
    		if !ok {
    			envName := strings.ToUpper(event.QueueName + "_QUEUE_URL")
    			queueURL := os.Getenv(envName)
    			if queueURL == "" {
    				return "", errors.Errorf("cannot send to queue(%s) because environment(%s) is undefined", event.QueueName, envName)
    			}
    
    			// Make an AWS session
    			sess, err := session.NewSessionWithOptions(session.Options{
    				Config: aws.Config{
    					Region: aws.String(producer.region),
    				},
    			})
    			if err != nil {
    				return "", errors.Wrapf(err, "failed to create AWS session")
    			}
    
    			messenger = &Messenger{
    				session:  sess,
    				service:  sqs.New(sess),
    				queueURL: queueURL,
    			}
    			producer.queues[event.QueueName] = messenger