From 7bdfe7d38eb9c30e62a2e1ca20b546cb0bd13e5a Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Fri, 15 Oct 2021 08:48:35 +0200 Subject: [PATCH] Update mem queues to work in local API --- api/api.go | 21 +-------------------- queues/mem/consumer.go | 23 ++++++++++++++++------- queues/mem/producer.go | 13 +++++++++---- queues/sqs/producer.go | 7 ++++--- service/service.go | 2 +- 5 files changed, 31 insertions(+), 35 deletions(-) diff --git a/api/api.go b/api/api.go index 7b7bc9d..2036cbe 100644 --- a/api/api.go +++ b/api/api.go @@ -5,13 +5,10 @@ import ( "net/http" "os" "runtime/debug" - "sync" - "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" - "gitlab.com/uafrica/go-utils/logger" queues_mem "gitlab.com/uafrica/go-utils/queues/mem" queues_sqs "gitlab.com/uafrica/go-utils/queues/sqs" "gitlab.com/uafrica/go-utils/service" @@ -151,24 +148,8 @@ func (api Api) Run() { if (api.localPort > 0 || os.Getenv("LOG_LEVEL") == "debug") && api.localQueueEventHandlers != nil { //use in-memory channels for async events api.Debugf("Using in-memory channels for async events ...") - memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) + memConsumer := queues_mem.NewConsumer(api.Service, api.localQueueEventHandlers) api = api.WithProducer(queues_mem.NewProducer(memConsumer)) - - sqsEventChan := make(chan events.SQSEvent) - sqsWaitGroup := sync.WaitGroup{} - sqsWaitGroup.Add(1) - go func() { - for event := range sqsEventChan { - logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event) - } - sqsWaitGroup.Done() - }() - - //when we terminate, close the sqs chan and wait for it to complete processing - defer func() { - close(sqsEventChan) - sqsWaitGroup.Wait() - }() } else { //use SQS for async events api.Debugf("Using SQS queue producer for async events ...") diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go index f5023e2..edc0dc8 100644 --- a/queues/mem/consumer.go +++ b/queues/mem/consumer.go @@ -16,25 +16,34 @@ import ( "gitlab.com/uafrica/go-utils/service" ) -func NewConsumer(routes map[string]interface{}) *consumer { +func NewConsumer(s service.Service, routes map[string]interface{}) queues.Consumer { + if s == nil { + panic("NewConsumer(service==nil)") + } router, err := queues.NewRouter(routes) if err != nil { panic(fmt.Sprintf("cannot create router: %+v", err)) } - //l := logger.New().WithFields(map[string]interface{}{"env": "dev"}) - //l.IFormatter = l.IFormatter.NextColor() - return &consumer{ - Service: service.New(), + c := &consumer{ + Service: s, router: router, queues: map[string]*queue{}, } + + //create a producer that will produce into this consumer + c.producer = &producer{ + consumer: c, + } + c.Service = c.Service.WithProducer(c.producer) + return c } type consumer struct { sync.Mutex service.Service - router queues.Router - queues map[string]*queue + router queues.Router + producer *producer + queues map[string]*queue } //wrap Service.WithStarter to return cron, else cannot be chained diff --git a/queues/mem/producer.go b/queues/mem/producer.go index 99977c1..5b289fb 100644 --- a/queues/mem/producer.go +++ b/queues/mem/producer.go @@ -2,17 +2,21 @@ package mem import ( "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) //can only produce locally if also consuming local -func NewProducer(consumer *consumer) service.Producer { - if consumer == nil { +func NewProducer(memConsumer queues.Consumer) service.Producer { + if memConsumer == nil { panic(errors.Errorf("cannot product locally without consumer")) } - return &producer{ - consumer: consumer, + mc, ok := memConsumer.(*consumer) + if !ok { + panic(errors.Errorf("NewProducer(%T) is not a mem consumer", memConsumer)) } + return mc.producer } type producer struct { @@ -20,6 +24,7 @@ type producer struct { } func (producer *producer) Send(event service.Event) (string, error) { + logger.Debugf("MEM producer.queue(%s) Sending event %+v", event.QueueName, event) q, err := producer.consumer.Queue(event.QueueName) if err != nil { return "", errors.Wrapf(err, "failed to get/create queue(%s)", event.QueueName) diff --git a/queues/sqs/producer.go b/queues/sqs/producer.go index 8f009fa..ce10639 100644 --- a/queues/sqs/producer.go +++ b/queues/sqs/producer.go @@ -22,12 +22,13 @@ func NewProducer(requestIDHeaderKey string) service.Producer { if requestIDHeaderKey == "" { requestIDHeaderKey = "request-id" } - return &producer{ + p := &producer{ region: region, requestIDHeaderKey: requestIDHeaderKey, session: nil, queues: map[string]*QueueProducer{}, } + return p } type producer struct { @@ -40,7 +41,7 @@ type producer struct { // Note: Calling code needs SQS IAM permissions func (producer *producer) Send(event service.Event) (string, error) { - logger.Debugf("producer=%T=%v", producer, producer) + logger.Debugf("SQS producer.Send(%+v)", event) messenger, ok := producer.queues[event.QueueName] if !ok { producer.Lock() @@ -89,7 +90,7 @@ type QueueProducer struct { } func (m *QueueProducer) Send(event service.Event) (string, error) { - //logger.Debugf("Sending event %+v", event) + logger.Debugf("SQS producer.queue(%s) Sending event %+v", m.queueURL, event) //add params as message attributes msgAttrs := make(map[string]*sqs.MessageAttributeValue) diff --git a/service/service.go b/service/service.go index 6cdbe9e..9d6d83e 100644 --- a/service/service.go +++ b/service/service.go @@ -13,6 +13,7 @@ import ( type Service interface { logger.Logger IErrorReporter + Producer audit.Auditor WithStarter(name string, starter IStarter) Service WithProducer(producer Producer) Service @@ -79,7 +80,6 @@ func (s service) WithStarter(name string, starter IStarter) Service { func (s service) WithProducer(producer Producer) Service { if producer != nil { s.Producer = producer - s.Infof("Producer = (%T)%v", producer, producer) } return s } -- GitLab