From ba65707801e45f7789bba212b18a9223847c6073 Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Thu, 14 Oct 2021 13:05:26 +0200 Subject: [PATCH] Fix producer for SQS consumer --- cron/cron.go | 2 -- queues/mem/consumer.go | 6 ++---- queues/sqs/consumer.go | 10 +++------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index abcfe25..316d547 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -28,7 +28,6 @@ func New(functions map[string]func(Context) error) Cron { return Cron{ Service: service.New(), - Logger: logger.New().WithFields(map[string]interface{}{"env": env}), env: env, router: router, checks: map[string]ICheck{}, @@ -38,7 +37,6 @@ func New(functions map[string]func(Context) error) Cron { type Cron struct { service.Service - logger.Logger env string router Router checks map[string]ICheck diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go index dd88a46..f5023e2 100644 --- a/queues/mem/consumer.go +++ b/queues/mem/consumer.go @@ -25,16 +25,14 @@ func NewConsumer(routes map[string]interface{}) *consumer { //l.IFormatter = l.IFormatter.NextColor() return &consumer{ Service: service.New(), - //Logger: l, - router: router, - queues: map[string]*queue{}, + router: router, + queues: map[string]*queue{}, } } type consumer struct { sync.Mutex service.Service - //logger.Logger router queues.Router queues map[string]*queue } diff --git a/queues/sqs/consumer.go b/queues/sqs/consumer.go index f41b504..cddbef8 100644 --- a/queues/sqs/consumer.go +++ b/queues/sqs/consumer.go @@ -40,13 +40,14 @@ func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queue } } + s := service.New(). + WithProducer(NewProducer(requestIDHeaderKey)) return consumer{ - Service: service.New(), + Service: s, env: env, router: router, requestIDHeaderKey: requestIDHeaderKey, ConstantMessageType: sqsMessageType, - producer: NewProducer(requestIDHeaderKey), checks: map[string]queues.ICheck{}, } } @@ -57,7 +58,6 @@ type consumer struct { router queues.Router requestIDHeaderKey string ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE") - producer service.Producer checks map[string]queues.ICheck } @@ -80,10 +80,6 @@ func (consumer consumer) WithAuditor(auditor audit.Auditor) queues.Consumer { } func (consumer consumer) Run() { - //create producer so event handler can queue new events (e.g. retries...) - consumer.Debugf("Using SQS queue producer for async events ...") - consumer.Service = consumer.Service.WithProducer(NewProducer(consumer.requestIDHeaderKey)) - lambda.Start(consumer.Handler) } -- GitLab