diff --git a/api/api.go b/api/api.go index 63d3dbe9671802dc5d7599d881dfd2132d75f198..cd105cb56a811ac23f8a6ef60eeea48766c1ad36 100644 --- a/api/api.go +++ b/api/api.go @@ -3,6 +3,7 @@ package api import ( "fmt" "net/http" + "os" "runtime/debug" "sync" @@ -121,58 +122,69 @@ func (api Api) WithCrashReported(crashReporter ICrashReporter) Api { return api } -func (api Api) WithLocalPort(localPortPtr *int, eventHandlers map[string]interface{}) Api { +//If local port is defined (!=nil and >0) then the lambda function +//is replaced with a local HTTP server +func (api Api) WithLocalPort(localPortPtr *int) Api { if api.localPort != 0 { panic("local port already defined") } if localPortPtr != nil && *localPortPtr > 0 { api.localPort = *localPortPtr - api.localQueueEventHandlers = eventHandlers } return api } +//WithEvents are not used in production, only when env LOG_LEVEL=debug +//then the SQS producer is replaced with in-memory producer that uses +//go channels to queue and process events, so they can be debugged locally +func (api Api) WithEvents(eventHandlers map[string]interface{}) Api { + if api.localQueueEventHandlers != nil { + panic("local queue event handlers already defined") + } + api.localQueueEventHandlers = eventHandlers + return api +} + //run and panic on error func (api Api) Run() { + //decide local or SQS + if os.Getenv("LOG_LEVEL") == "debug" && api.localQueueEventHandlers != nil { + //use in-memory channels for async events + api.Debugf("Using in-memory channels for async events ...") + memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) + api = api.WithProducer(queues_mem.NewProducer(memConsumer)) + + sqsEventChan := make(chan events.SQSEvent) + sqsWaitGroup := sync.WaitGroup{} + sqsWaitGroup.Add(1) + go func() { + for event := range sqsEventChan { + logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event) + } + sqsWaitGroup.Done() + }() + + //when we terminate, close the sqs chan and wait for it to complete processing + defer func() { + close(sqsEventChan) + sqsWaitGroup.Wait() + }() + } else { + //use SQS for async events + api.Debugf("Using SQS queue producer for async events ...") + api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) + } + //decide local of lambda if api.localPort > 0 { //running locally with standard HTTP server - - if api.localQueueEventHandlers != nil { - //when running locally - we want to send and process SQS events locally using channels - //here we create a SQS chan and start listening to it - //again: this is quick hack... will make this part of framework once it works well - api.Debugf("Creating local queue consumer/producer...") - memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) - api = api.WithProducer(queues_mem.NewProducer(memConsumer)) - - sqsEventChan := make(chan events.SQSEvent) - sqsWaitGroup := sync.WaitGroup{} - sqsWaitGroup.Add(1) - go func() { - for event := range sqsEventChan { - logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event) - } - sqsWaitGroup.Done() - }() - - //when we terminate, close the sqs chan and wait for it to complete processing - defer func() { - close(sqsEventChan) - sqsWaitGroup.Wait() - }() - } else { - //use SQS for events - api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) - } - err := http.ListenAndServe(fmt.Sprintf(":%d", api.localPort), api) //calls api.ServeHTTP() which calls api.Handler() if err != nil { panic(err) } } else { - api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) - lambda.Start(api.Handler) //calls api.Handler directly + //run as an AWS Lambda function + lambda.Start(api.Handler) } } diff --git a/examples/core/api/main.go b/examples/core/api/main.go index 88d14e452c3a5ff3fb472d92daa8f3386bdec922..34402bcb4784193500c70bcbd2bdfaed0162adba 100644 --- a/examples/core/api/main.go +++ b/examples/core/api/main.go @@ -28,7 +28,8 @@ func main() { WithCheck("rate", rateLimiter{}). WithCORS(cors{}). WithAuditor(audit.File(os.Stdout)). - WithLocalPort(localPort, app.QueueRoutes()). //if nil will still run as lambda + WithEvents(app.QueueRoutes()). //only used when LOG_LEVEL="debug" + WithLocalPort(localPort). //if nil will still run as lambda Run() }