diff --git a/api/api.go b/api/api.go index 7b7bc9df0a2742bc4a94654d33fbca1604ff3ca3..2036cbe31af0da75c4df5a3cb223ded63fcc58a7 100644 --- a/api/api.go +++ b/api/api.go @@ -5,13 +5,10 @@ import ( "net/http" "os" "runtime/debug" - "sync" - "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" - "gitlab.com/uafrica/go-utils/logger" queues_mem "gitlab.com/uafrica/go-utils/queues/mem" queues_sqs "gitlab.com/uafrica/go-utils/queues/sqs" "gitlab.com/uafrica/go-utils/service" @@ -151,24 +148,8 @@ func (api Api) Run() { if (api.localPort > 0 || os.Getenv("LOG_LEVEL") == "debug") && api.localQueueEventHandlers != nil { //use in-memory channels for async events api.Debugf("Using in-memory channels for async events ...") - memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) + memConsumer := queues_mem.NewConsumer(api.Service, api.localQueueEventHandlers) api = api.WithProducer(queues_mem.NewProducer(memConsumer)) - - sqsEventChan := make(chan events.SQSEvent) - sqsWaitGroup := sync.WaitGroup{} - sqsWaitGroup.Add(1) - go func() { - for event := range sqsEventChan { - logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event) - } - sqsWaitGroup.Done() - }() - - //when we terminate, close the sqs chan and wait for it to complete processing - defer func() { - close(sqsEventChan) - sqsWaitGroup.Wait() - }() } else { //use SQS for async events api.Debugf("Using SQS queue producer for async events ...") diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go index f5023e28829f9e8d6490a93145b3c50fc50abe1b..edc0dc8582846763f2ed2cb930960d4face60468 100644 --- a/queues/mem/consumer.go +++ b/queues/mem/consumer.go @@ -16,25 +16,34 @@ import ( "gitlab.com/uafrica/go-utils/service" ) -func NewConsumer(routes map[string]interface{}) *consumer { +func NewConsumer(s service.Service, routes map[string]interface{}) queues.Consumer { + if s == nil { + panic("NewConsumer(service==nil)") + } router, err := queues.NewRouter(routes) if err != nil { panic(fmt.Sprintf("cannot create router: %+v", err)) } - //l := logger.New().WithFields(map[string]interface{}{"env": "dev"}) - //l.IFormatter = l.IFormatter.NextColor() - return &consumer{ - Service: service.New(), + c := &consumer{ + Service: s, router: router, queues: map[string]*queue{}, } + + //create a producer that will produce into this consumer + c.producer = &producer{ + consumer: c, + } + c.Service = c.Service.WithProducer(c.producer) + return c } type consumer struct { sync.Mutex service.Service - router queues.Router - queues map[string]*queue + router queues.Router + producer *producer + queues map[string]*queue } //wrap Service.WithStarter to return cron, else cannot be chained diff --git a/queues/mem/producer.go b/queues/mem/producer.go index 99977c171d7b249b7f6e3afc0d0549824c612afa..5b289fbb0b9c30a29d2d98ac2e246ce763906fd5 100644 --- a/queues/mem/producer.go +++ b/queues/mem/producer.go @@ -2,17 +2,21 @@ package mem import ( "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) //can only produce locally if also consuming local -func NewProducer(consumer *consumer) service.Producer { - if consumer == nil { +func NewProducer(memConsumer queues.Consumer) service.Producer { + if memConsumer == nil { panic(errors.Errorf("cannot product locally without consumer")) } - return &producer{ - consumer: consumer, + mc, ok := memConsumer.(*consumer) + if !ok { + panic(errors.Errorf("NewProducer(%T) is not a mem consumer", memConsumer)) } + return mc.producer } type producer struct { @@ -20,6 +24,7 @@ type producer struct { } func (producer *producer) Send(event service.Event) (string, error) { + logger.Debugf("MEM producer.queue(%s) Sending event %+v", event.QueueName, event) q, err := producer.consumer.Queue(event.QueueName) if err != nil { return "", errors.Wrapf(err, "failed to get/create queue(%s)", event.QueueName) diff --git a/queues/sqs/producer.go b/queues/sqs/producer.go index 8f009fa48788ad4c642af98b4529041019b9c1d9..ce10639f7eef631b83195b07d2a0a582e25a3995 100644 --- a/queues/sqs/producer.go +++ b/queues/sqs/producer.go @@ -22,12 +22,13 @@ func NewProducer(requestIDHeaderKey string) service.Producer { if requestIDHeaderKey == "" { requestIDHeaderKey = "request-id" } - return &producer{ + p := &producer{ region: region, requestIDHeaderKey: requestIDHeaderKey, session: nil, queues: map[string]*QueueProducer{}, } + return p } type producer struct { @@ -40,7 +41,7 @@ type producer struct { // Note: Calling code needs SQS IAM permissions func (producer *producer) Send(event service.Event) (string, error) { - logger.Debugf("producer=%T=%v", producer, producer) + logger.Debugf("SQS producer.Send(%+v)", event) messenger, ok := producer.queues[event.QueueName] if !ok { producer.Lock() @@ -89,7 +90,7 @@ type QueueProducer struct { } func (m *QueueProducer) Send(event service.Event) (string, error) { - //logger.Debugf("Sending event %+v", event) + logger.Debugf("SQS producer.queue(%s) Sending event %+v", m.queueURL, event) //add params as message attributes msgAttrs := make(map[string]*sqs.MessageAttributeValue) diff --git a/service/service.go b/service/service.go index 6cdbe9e9c0d5fe945bbf82ee7d2405e64415e342..9d6d83ee6c4a74a660fe990b43fb5a325525a63b 100644 --- a/service/service.go +++ b/service/service.go @@ -13,6 +13,7 @@ import ( type Service interface { logger.Logger IErrorReporter + Producer audit.Auditor WithStarter(name string, starter IStarter) Service WithProducer(producer Producer) Service @@ -79,7 +80,6 @@ func (s service) WithStarter(name string, starter IStarter) Service { func (s service) WithProducer(producer Producer) Service { if producer != nil { s.Producer = producer - s.Infof("Producer = (%T)%v", producer, producer) } return s }