Skip to content
Snippets Groups Projects
Commit 040f044e authored by Jan Semmelink's avatar Jan Semmelink
Browse files

Update api to run with local queues when LOG_LEVEL=debug, so we can now debug...

Update api to run with local queues when LOG_LEVEL=debug, so we can now debug event handlers in normal mage debug of the api
parent 937b0fac
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ package api ...@@ -3,6 +3,7 @@ package api
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"os"
"runtime/debug" "runtime/debug"
"sync" "sync"
...@@ -121,28 +122,35 @@ func (api Api) WithCrashReported(crashReporter ICrashReporter) Api { ...@@ -121,28 +122,35 @@ func (api Api) WithCrashReported(crashReporter ICrashReporter) Api {
return api return api
} }
func (api Api) WithLocalPort(localPortPtr *int, eventHandlers map[string]interface{}) Api { //If local port is defined (!=nil and >0) then the lambda function
//is replaced with a local HTTP server
func (api Api) WithLocalPort(localPortPtr *int) Api {
if api.localPort != 0 { if api.localPort != 0 {
panic("local port already defined") panic("local port already defined")
} }
if localPortPtr != nil && *localPortPtr > 0 { if localPortPtr != nil && *localPortPtr > 0 {
api.localPort = *localPortPtr api.localPort = *localPortPtr
api.localQueueEventHandlers = eventHandlers
} }
return api return api
} }
//WithEvents are not used in production, only when env LOG_LEVEL=debug
//then the SQS producer is replaced with in-memory producer that uses
//go channels to queue and process events, so they can be debugged locally
func (api Api) WithEvents(eventHandlers map[string]interface{}) Api {
if api.localQueueEventHandlers != nil {
panic("local queue event handlers already defined")
}
api.localQueueEventHandlers = eventHandlers
return api
}
//run and panic on error //run and panic on error
func (api Api) Run() { func (api Api) Run() {
//decide local of lambda //decide local or SQS
if api.localPort > 0 { if os.Getenv("LOG_LEVEL") == "debug" && api.localQueueEventHandlers != nil {
//running locally with standard HTTP server //use in-memory channels for async events
api.Debugf("Using in-memory channels for async events ...")
if api.localQueueEventHandlers != nil {
//when running locally - we want to send and process SQS events locally using channels
//here we create a SQS chan and start listening to it
//again: this is quick hack... will make this part of framework once it works well
api.Debugf("Creating local queue consumer/producer...")
memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers) memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers)
api = api.WithProducer(queues_mem.NewProducer(memConsumer)) api = api.WithProducer(queues_mem.NewProducer(memConsumer))
...@@ -162,17 +170,21 @@ func (api Api) Run() { ...@@ -162,17 +170,21 @@ func (api Api) Run() {
sqsWaitGroup.Wait() sqsWaitGroup.Wait()
}() }()
} else { } else {
//use SQS for events //use SQS for async events
api.Debugf("Using SQS queue producer for async events ...")
api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey))
} }
//decide local of lambda
if api.localPort > 0 {
//running locally with standard HTTP server
err := http.ListenAndServe(fmt.Sprintf(":%d", api.localPort), api) //calls api.ServeHTTP() which calls api.Handler() err := http.ListenAndServe(fmt.Sprintf(":%d", api.localPort), api) //calls api.ServeHTTP() which calls api.Handler()
if err != nil { if err != nil {
panic(err) panic(err)
} }
} else { } else {
api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) //run as an AWS Lambda function
lambda.Start(api.Handler) //calls api.Handler directly lambda.Start(api.Handler)
} }
} }
......
...@@ -28,7 +28,8 @@ func main() { ...@@ -28,7 +28,8 @@ func main() {
WithCheck("rate", rateLimiter{}). WithCheck("rate", rateLimiter{}).
WithCORS(cors{}). WithCORS(cors{}).
WithAuditor(audit.File(os.Stdout)). WithAuditor(audit.File(os.Stdout)).
WithLocalPort(localPort, app.QueueRoutes()). //if nil will still run as lambda WithEvents(app.QueueRoutes()). //only used when LOG_LEVEL="debug"
WithLocalPort(localPort). //if nil will still run as lambda
Run() Run()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment