From 040f044e67f53563a8c8d9f6ce55e0086c2ad53d Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Wed, 13 Oct 2021 15:11:42 +0200 Subject: [PATCH] Update api to run with local queues when LOG_LEVEL=debug, so we can now debug event handlers in normal mage debug of the api --- api/api.go | 78 ++++++++++++++++++++++----------------- examples/core/api/main.go | 3 +- 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/api/api.go b/api/api.go index 63d3dbe..cd105cb 100644 --- a/api/api.go +++ b/api/api.go @@ -3,6 +3,7 @@ package api import ( "fmt" "net/http" + "os" "runtime/debug" "sync" @@ -121,58 +122,69 @@ func (api Api) WithCrashReported(crashReporter ICrashReporter) Api { return api } -func (api Api) WithLocalPort(localPortPtr *int, eventHandlers map[string]interface{}) Api { +//If local port is defined (!=nil and >0) then the lambda function +//is replaced with a local HTTP server +func (api Api) WithLocalPort(localPortPtr *int) Api { if api.localPort != 0 { panic("local port already defined") } if localPortPtr != nil && *localPortPtr > 0 { api.localPort = *localPortPtr - api.localQueueEventHandlers = eventHandlers } return api } +//WithEvents are not used in production, only when env LOG_LEVEL=debug +//then the SQS producer is replaced with in-memory producer that uses +//go channels to queue and process events, so they can be debugged locally +func (api Api) WithEvents(eventHandlers map[string]interface{}) Api { + if api.localQueueEventHandlers != nil { + panic("local queue event handlers already defined") + } + api.localQueueEventHandlers = eventHandlers + return api +} + //run and panic on error func (api Api) Run() { + //decide local or SQS + if os.Getenv("LOG_LEVEL") == "debug" && api.localQueueEventHandlers != nil { + //use in-memory channels for async events + api.Debugf("Using in-memory channels for async events ...") + memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) + api = api.WithProducer(queues_mem.NewProducer(memConsumer)) + + sqsEventChan := make(chan events.SQSEvent) + sqsWaitGroup := sync.WaitGroup{} + sqsWaitGroup.Add(1) + go func() { + for event := range sqsEventChan { + logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event) + } + sqsWaitGroup.Done() + }() + + //when we terminate, close the sqs chan and wait for it to complete processing + defer func() { + close(sqsEventChan) + sqsWaitGroup.Wait() + }() + } else { + //use SQS for async events + api.Debugf("Using SQS queue producer for async events ...") + api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) + } + //decide local of lambda if api.localPort > 0 { //running locally with standard HTTP server - - if api.localQueueEventHandlers != nil { - //when running locally - we want to send and process SQS events locally using channels - //here we create a SQS chan and start listening to it - //again: this is quick hack... will make this part of framework once it works well - api.Debugf("Creating local queue consumer/producer...") - memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) - api = api.WithProducer(queues_mem.NewProducer(memConsumer)) - - sqsEventChan := make(chan events.SQSEvent) - sqsWaitGroup := sync.WaitGroup{} - sqsWaitGroup.Add(1) - go func() { - for event := range sqsEventChan { - logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event) - } - sqsWaitGroup.Done() - }() - - //when we terminate, close the sqs chan and wait for it to complete processing - defer func() { - close(sqsEventChan) - sqsWaitGroup.Wait() - }() - } else { - //use SQS for events - api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) - } - err := http.ListenAndServe(fmt.Sprintf(":%d", api.localPort), api) //calls api.ServeHTTP() which calls api.Handler() if err != nil { panic(err) } } else { - api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) - lambda.Start(api.Handler) //calls api.Handler directly + //run as an AWS Lambda function + lambda.Start(api.Handler) } } diff --git a/examples/core/api/main.go b/examples/core/api/main.go index 88d14e4..34402bc 100644 --- a/examples/core/api/main.go +++ b/examples/core/api/main.go @@ -28,7 +28,8 @@ func main() { WithCheck("rate", rateLimiter{}). WithCORS(cors{}). WithAuditor(audit.File(os.Stdout)). - WithLocalPort(localPort, app.QueueRoutes()). //if nil will still run as lambda + WithEvents(app.QueueRoutes()). //only used when LOG_LEVEL="debug" + WithLocalPort(localPort). //if nil will still run as lambda Run() } -- GitLab