From 9c76347b5711ba21d99a8509670098d3ea0c9d42 Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Wed, 29 Sep 2021 10:53:32 +0200 Subject: [PATCH] Add dbConn to cron context, and processing of events from file for testing --- config/README.md | 10 +++ config/local.go | 47 +++++++++++++ cron/cron.go | 2 +- cron/lambda.go | 18 ++++- cron/router.go | 3 + queues/consumer.go | 1 + queues/mem/consumer.go | 149 +++++++++++++++++++++++++---------------- queues/sqs/consumer.go | 34 ++++++++++ 8 files changed, 202 insertions(+), 62 deletions(-) create mode 100644 config/README.md create mode 100644 config/local.go diff --git a/config/README.md b/config/README.md new file mode 100644 index 0000000..e935745 --- /dev/null +++ b/config/README.md @@ -0,0 +1,10 @@ +# Config + +Only used for local development on the terminal console, to set ENV from a JSON file, that simulates the environment created by AWS for our lambda images. + +## How it works: +When api/cron/sqs starts (see V3), +They check if running local for testing (i.e. command line option used), +If so they look for config.local.json in the current directory or any parent directory +They generally find the one in the project repo top level directory +Then set all the values in that file in the env diff --git a/config/local.go b/config/local.go new file mode 100644 index 0000000..59f5251 --- /dev/null +++ b/config/local.go @@ -0,0 +1,47 @@ +package config + +import ( + "encoding/json" + "fmt" + "os" + "path" + + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" +) + +func LoadLocal() error { + configDir, err := os.Getwd() + if err != nil { + return errors.Wrapf(err, "cannot get working directory") + } + configFilename := "config.local.json" + for { + fn := configDir + "/" + configFilename + f, err := os.Open(fn) + if err != nil { + logger.Debugf("%s not found in %s", configFilename, configDir) + parentDir := path.Dir(configDir) + if parentDir == configDir { + return errors.Errorf("did not find file %s in working dir or any parent dir", configFilename) + } + configDir = parentDir + continue + } + + defer f.Close() + + var config map[string]interface{} + if err := json.NewDecoder(f).Decode(&config); err != nil { + return errors.Wrapf(err, "failed to decode JSON from file %s", fn) + } + + for n, v := range config { + vs := fmt.Sprintf("%v", v) + os.Setenv(n, vs) + logger.Debugf("Defined local config %s=%s", n, vs) + } + + return nil + } +} //LoadLocal() diff --git a/cron/cron.go b/cron/cron.go index 19f5eb9..0683c14 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -81,7 +81,7 @@ func (cron Cron) Run(invokeArn *string) { ) err := cron.Handler(lambdaContext) if err != nil { - panic(errors.Wrapf(err, "cron failed")) + panic(errors.Errorf("cron failed: %+v", err)) } else { logger.Debugf("cron success") } diff --git a/cron/lambda.go b/cron/lambda.go index 0d46d6a..ea2fb6f 100644 --- a/cron/lambda.go +++ b/cron/lambda.go @@ -12,13 +12,13 @@ import ( type LambdaCronHandler func(lambdaCtx context.Context) error -func (cron Cron) Handler(lambdaCtx context.Context) error { +func (cron Cron) Handler(lambdaCtx context.Context) (err error) { lc, _ := lambdacontext.FromContext(lambdaCtx) requestID := lc.AwsRequestID cronName, cronFunc := cron.router.Route(lc.InvokedFunctionArn) if cronFunc == nil { - return errors.Errorf("request-id:%s unknown cron function(%s)", lc.InvokedFunctionArn) + return errors.Errorf("request-id:%s unknown cron function(%s)", requestID, lc.InvokedFunctionArn) } //got a handler, prepare to run: @@ -33,6 +33,20 @@ func (cron Cron) Handler(lambdaCtx context.Context) error { Name: cronName, } + defer func() { + if err != nil { + ctx.Errorf("failed: %+v", err) + } + }() + + if cron.dbConn != nil { + ctx.DB, err = cron.dbConn.Connect() + if err != nil { + err = errors.Wrapf(err, "failed to connect to db") + return + } + } + //report handler crashes defer cron.crashReporter.Catch(ctx) diff --git a/cron/router.go b/cron/router.go index 83be727..c58bf7f 100644 --- a/cron/router.go +++ b/cron/router.go @@ -5,6 +5,7 @@ import ( "strings" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" ) type Router struct { @@ -19,6 +20,8 @@ func (r Router) Route(arn string) (string, func(Context) error) { for name, hdlr := range r.endpoints { if strings.Contains(arn, name) { return name, hdlr + } else { + logger.Debugf("ARN(%s) does not contain cronName(%s)", arn, name) } } return "", nil diff --git a/queues/consumer.go b/queues/consumer.go index 1b5fae2..7549b85 100644 --- a/queues/consumer.go +++ b/queues/consumer.go @@ -5,4 +5,5 @@ import "gitlab.com/uafrica/go-utils/service" type IConsumer interface { WithDb(dbConn service.IDatabaseConnector) IConsumer Run() + ProcessFile(filename string) error } diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go index 7727ff0..eb314a1 100644 --- a/queues/mem/consumer.go +++ b/queues/mem/consumer.go @@ -2,8 +2,10 @@ package mem import ( "context" + "encoding/json" "fmt" "math/rand" + "os" "reflect" "sync" "time" @@ -63,6 +65,32 @@ func (consumer *Consumer) Run() { panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER")) } +func (consumer *Consumer) ProcessFile(filename string) error { + f, err := os.Open(filename) + if err != nil { + return errors.Wrapf(err, "failed to open queue event file %s", filename) + } + defer f.Close() + + var event queues.Event + if err := json.NewDecoder(f).Decode(&event); err != nil { + return errors.Wrapf(err, "failed to read queues.Event from file %s", filename) + } + + q := queue{ + consumer: consumer, + name: "NoName", + ch: nil, + } + + if q.process( + event, + ); err != nil { + return errors.Wrapf(err, "failed to process event from file %s", filename) + } + return nil +} + type queue struct { consumer *Consumer name string @@ -71,75 +99,78 @@ type queue struct { func (q *queue) run() { for event := range q.ch { - //todo: create context with logger - rand.Seed(time.Now().Unix()) - - //report handler crashes - //defer sqs.crashReporter.Catch(ctx) - - var db *bun.DB - if q.consumer.dbConn != nil { - var err error - db, err = q.consumer.dbConn.Connect() - if err != nil { - q.consumer.Errorf("failed to connect to db: %+v", err) - continue - } + err := q.process(event) + if err != nil { + q.consumer.Errorf("processing failed: %+v", err) } + } +} - baseCtx := context.Background() - ctx := queues.Context{ - Context: service.NewContext(baseCtx, map[string]interface{}{ - "env": "dev", - "request_id": event.RequestID, - "message_type": event.TypeName, - }), - IProducer: q, //todo: q can only send back into this queue... may need to send to other queues! - Event: event, - RequestID: event.RequestIDValue, - DB: db, - } +func (q *queue) process(event queues.Event) error { + //todo: create context with logger + rand.Seed(time.Now().Unix()) - ctx.WithFields(map[string]interface{}{ - "params": event.ParamValues, - "body": event.BodyJSON, - }).Infof("Queue(%s) Recv SQS Event: %v", q.name, event) + //report handler crashes + //defer sqs.crashReporter.Catch(ctx) - //routing on messageType - sqsHandler, err := q.consumer.router.Route(event.TypeName) + var db *bun.DB + if q.consumer.dbConn != nil { + var err error + db, err = q.consumer.dbConn.Connect() if err != nil { - ctx.Errorf("Unhandled event type(%v): %v", event.TypeName, err) - continue - } - handler, ok := sqsHandler.(queues.Handler) - if !ok { - ctx.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler) - continue + return errors.Wrapf(err, "failed to connect to db") } + } - args := []reflect.Value{ - reflect.ValueOf(ctx), - } + baseCtx := context.Background() + ctx := queues.Context{ + Context: service.NewContext(baseCtx, map[string]interface{}{ + "env": "dev", + "request_id": event.RequestID, + "message_type": event.TypeName, + }), + IProducer: q, //todo: q can only send back into this queue... may need to send to other queues! + Event: event, + RequestID: event.RequestIDValue, + DB: db, + } - //allocate, populate and validate request struct - var recordStruct interface{} - recordStruct, err = ctx.GetRecord(handler.RecordType) - if err != nil { - ctx.Errorf("invalid message: %+v", err) - continue - } + ctx.WithFields(map[string]interface{}{ + "params": event.ParamValues, + "body": event.BodyJSON, + }).Infof("Queue(%s) Recv SQS Event: %v", q.name, event) + + //routing on messageType + sqsHandler, err := q.consumer.router.Route(event.TypeName) + if err != nil { + return errors.Wrapf(err, "unhandled event type(%v)", event.TypeName) + } + handler, ok := sqsHandler.(queues.Handler) + if !ok { + return errors.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler) + } - ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) - args = append(args, reflect.ValueOf(recordStruct)) + args := []reflect.Value{ + reflect.ValueOf(ctx), + } - results := handler.FuncValue.Call(args) - if len(results) > 0 && !results[0].IsNil() { - ctx.Errorf("handler failed: %+v", results[0].Interface().(error)) - } else { - ctx.Debugf("handler success") - } - } //for each event from chan -} //queue.run() + //allocate, populate and validate request struct + var recordStruct interface{} + recordStruct, err = ctx.GetRecord(handler.RecordType) + if err != nil { + return errors.Wrapf(err, "invalid message body") + } + + ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + args = append(args, reflect.ValueOf(recordStruct)) + + results := handler.FuncValue.Call(args) + if len(results) > 0 && !results[0].IsNil() { + return errors.Wrapf(results[0].Interface().(error), "handler failed") + } + ctx.Debugf("handler success") + return nil +} //queue.process() func (q *queue) Send(event queues.Event) (msgID string, err error) { event.MessageID = uuid.New().String() diff --git a/queues/sqs/consumer.go b/queues/sqs/consumer.go index 753a875..8b9e191 100644 --- a/queues/sqs/consumer.go +++ b/queues/sqs/consumer.go @@ -2,14 +2,19 @@ package sqs import ( "context" + "encoding/json" "fmt" "math/rand" "os" + "path" "reflect" + "strings" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" + "github.com/aws/aws-lambda-go/lambdacontext" + "github.com/google/uuid" "github.com/uptrace/bun" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" @@ -65,6 +70,35 @@ func (consumer consumer) Run() { lambda.Start(consumer.Handler) } +func (consumer consumer) ProcessFile(filename string) error { + f, err := os.Open(filename) + if err != nil { + return errors.Wrapf(err, "failed to open queue event file %s", filename) + } + defer f.Close() + + var event events.SQSEvent + if err := json.NewDecoder(f).Decode(&event); err != nil { + return errors.Wrapf(err, "failed to read sqs event from file %s", filename) + } + + if consumer.Handler( + lambdacontext.NewContext( + context.Background(), + &lambdacontext.LambdaContext{ + AwsRequestID: uuid.New().String(), + InvokedFunctionArn: strings.TrimSuffix(path.Base(filename), ".json"), + // Identity CognitoIdentity + // ClientContext ClientContext + }, + ), + event, + ); err != nil { + return errors.Wrapf(err, "failed to process event from file %s", filename) + } + return nil +} + func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error { //todo: create context with logger rand.Seed(time.Now().Unix()) -- GitLab