Skip to content
Snippets Groups Projects
Commit 9c76347b authored by Jan Semmelink's avatar Jan Semmelink
Browse files

Add dbConn to cron context, and processing of events from file for testing

parent b497e343
Branches
Tags v1.2.6
No related merge requests found
# Config
Only used for local development on the terminal console, to set ENV from a JSON file, that simulates the environment created by AWS for our lambda images.
## How it works:
When api/cron/sqs starts (see V3),
They check if running local for testing (i.e. command line option used),
If so they look for config.local.json in the current directory or any parent directory
They generally find the one in the project repo top level directory
Then set all the values in that file in the env
package config
import (
"encoding/json"
"fmt"
"os"
"path"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
)
func LoadLocal() error {
configDir, err := os.Getwd()
if err != nil {
return errors.Wrapf(err, "cannot get working directory")
}
configFilename := "config.local.json"
for {
fn := configDir + "/" + configFilename
f, err := os.Open(fn)
if err != nil {
logger.Debugf("%s not found in %s", configFilename, configDir)
parentDir := path.Dir(configDir)
if parentDir == configDir {
return errors.Errorf("did not find file %s in working dir or any parent dir", configFilename)
}
configDir = parentDir
continue
}
defer f.Close()
var config map[string]interface{}
if err := json.NewDecoder(f).Decode(&config); err != nil {
return errors.Wrapf(err, "failed to decode JSON from file %s", fn)
}
for n, v := range config {
vs := fmt.Sprintf("%v", v)
os.Setenv(n, vs)
logger.Debugf("Defined local config %s=%s", n, vs)
}
return nil
}
} //LoadLocal()
......@@ -81,7 +81,7 @@ func (cron Cron) Run(invokeArn *string) {
)
err := cron.Handler(lambdaContext)
if err != nil {
panic(errors.Wrapf(err, "cron failed"))
panic(errors.Errorf("cron failed: %+v", err))
} else {
logger.Debugf("cron success")
}
......
......@@ -12,13 +12,13 @@ import (
type LambdaCronHandler func(lambdaCtx context.Context) error
func (cron Cron) Handler(lambdaCtx context.Context) error {
func (cron Cron) Handler(lambdaCtx context.Context) (err error) {
lc, _ := lambdacontext.FromContext(lambdaCtx)
requestID := lc.AwsRequestID
cronName, cronFunc := cron.router.Route(lc.InvokedFunctionArn)
if cronFunc == nil {
return errors.Errorf("request-id:%s unknown cron function(%s)", lc.InvokedFunctionArn)
return errors.Errorf("request-id:%s unknown cron function(%s)", requestID, lc.InvokedFunctionArn)
}
//got a handler, prepare to run:
......@@ -33,6 +33,20 @@ func (cron Cron) Handler(lambdaCtx context.Context) error {
Name: cronName,
}
defer func() {
if err != nil {
ctx.Errorf("failed: %+v", err)
}
}()
if cron.dbConn != nil {
ctx.DB, err = cron.dbConn.Connect()
if err != nil {
err = errors.Wrapf(err, "failed to connect to db")
return
}
}
//report handler crashes
defer cron.crashReporter.Catch(ctx)
......
......@@ -5,6 +5,7 @@ import (
"strings"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
)
type Router struct {
......@@ -19,6 +20,8 @@ func (r Router) Route(arn string) (string, func(Context) error) {
for name, hdlr := range r.endpoints {
if strings.Contains(arn, name) {
return name, hdlr
} else {
logger.Debugf("ARN(%s) does not contain cronName(%s)", arn, name)
}
}
return "", nil
......
......@@ -5,4 +5,5 @@ import "gitlab.com/uafrica/go-utils/service"
type IConsumer interface {
WithDb(dbConn service.IDatabaseConnector) IConsumer
Run()
ProcessFile(filename string) error
}
......@@ -2,8 +2,10 @@ package mem
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"reflect"
"sync"
"time"
......@@ -63,6 +65,32 @@ func (consumer *Consumer) Run() {
panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER"))
}
func (consumer *Consumer) ProcessFile(filename string) error {
f, err := os.Open(filename)
if err != nil {
return errors.Wrapf(err, "failed to open queue event file %s", filename)
}
defer f.Close()
var event queues.Event
if err := json.NewDecoder(f).Decode(&event); err != nil {
return errors.Wrapf(err, "failed to read queues.Event from file %s", filename)
}
q := queue{
consumer: consumer,
name: "NoName",
ch: nil,
}
if q.process(
event,
); err != nil {
return errors.Wrapf(err, "failed to process event from file %s", filename)
}
return nil
}
type queue struct {
consumer *Consumer
name string
......@@ -71,75 +99,78 @@ type queue struct {
func (q *queue) run() {
for event := range q.ch {
//todo: create context with logger
rand.Seed(time.Now().Unix())
//report handler crashes
//defer sqs.crashReporter.Catch(ctx)
var db *bun.DB
if q.consumer.dbConn != nil {
var err error
db, err = q.consumer.dbConn.Connect()
if err != nil {
q.consumer.Errorf("failed to connect to db: %+v", err)
continue
}
err := q.process(event)
if err != nil {
q.consumer.Errorf("processing failed: %+v", err)
}
}
}
baseCtx := context.Background()
ctx := queues.Context{
Context: service.NewContext(baseCtx, map[string]interface{}{
"env": "dev",
"request_id": event.RequestID,
"message_type": event.TypeName,
}),
IProducer: q, //todo: q can only send back into this queue... may need to send to other queues!
Event: event,
RequestID: event.RequestIDValue,
DB: db,
}
func (q *queue) process(event queues.Event) error {
//todo: create context with logger
rand.Seed(time.Now().Unix())
ctx.WithFields(map[string]interface{}{
"params": event.ParamValues,
"body": event.BodyJSON,
}).Infof("Queue(%s) Recv SQS Event: %v", q.name, event)
//report handler crashes
//defer sqs.crashReporter.Catch(ctx)
//routing on messageType
sqsHandler, err := q.consumer.router.Route(event.TypeName)
var db *bun.DB
if q.consumer.dbConn != nil {
var err error
db, err = q.consumer.dbConn.Connect()
if err != nil {
ctx.Errorf("Unhandled event type(%v): %v", event.TypeName, err)
continue
}
handler, ok := sqsHandler.(queues.Handler)
if !ok {
ctx.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler)
continue
return errors.Wrapf(err, "failed to connect to db")
}
}
args := []reflect.Value{
reflect.ValueOf(ctx),
}
baseCtx := context.Background()
ctx := queues.Context{
Context: service.NewContext(baseCtx, map[string]interface{}{
"env": "dev",
"request_id": event.RequestID,
"message_type": event.TypeName,
}),
IProducer: q, //todo: q can only send back into this queue... may need to send to other queues!
Event: event,
RequestID: event.RequestIDValue,
DB: db,
}
//allocate, populate and validate request struct
var recordStruct interface{}
recordStruct, err = ctx.GetRecord(handler.RecordType)
if err != nil {
ctx.Errorf("invalid message: %+v", err)
continue
}
ctx.WithFields(map[string]interface{}{
"params": event.ParamValues,
"body": event.BodyJSON,
}).Infof("Queue(%s) Recv SQS Event: %v", q.name, event)
//routing on messageType
sqsHandler, err := q.consumer.router.Route(event.TypeName)
if err != nil {
return errors.Wrapf(err, "unhandled event type(%v)", event.TypeName)
}
handler, ok := sqsHandler.(queues.Handler)
if !ok {
return errors.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler)
}
ctx.Debugf("message (%T) %+v", recordStruct, recordStruct)
args = append(args, reflect.ValueOf(recordStruct))
args := []reflect.Value{
reflect.ValueOf(ctx),
}
results := handler.FuncValue.Call(args)
if len(results) > 0 && !results[0].IsNil() {
ctx.Errorf("handler failed: %+v", results[0].Interface().(error))
} else {
ctx.Debugf("handler success")
}
} //for each event from chan
} //queue.run()
//allocate, populate and validate request struct
var recordStruct interface{}
recordStruct, err = ctx.GetRecord(handler.RecordType)
if err != nil {
return errors.Wrapf(err, "invalid message body")
}
ctx.Debugf("message (%T) %+v", recordStruct, recordStruct)
args = append(args, reflect.ValueOf(recordStruct))
results := handler.FuncValue.Call(args)
if len(results) > 0 && !results[0].IsNil() {
return errors.Wrapf(results[0].Interface().(error), "handler failed")
}
ctx.Debugf("handler success")
return nil
} //queue.process()
func (q *queue) Send(event queues.Event) (msgID string, err error) {
event.MessageID = uuid.New().String()
......
......@@ -2,14 +2,19 @@ package sqs
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path"
"reflect"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/google/uuid"
"github.com/uptrace/bun"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
......@@ -65,6 +70,35 @@ func (consumer consumer) Run() {
lambda.Start(consumer.Handler)
}
func (consumer consumer) ProcessFile(filename string) error {
f, err := os.Open(filename)
if err != nil {
return errors.Wrapf(err, "failed to open queue event file %s", filename)
}
defer f.Close()
var event events.SQSEvent
if err := json.NewDecoder(f).Decode(&event); err != nil {
return errors.Wrapf(err, "failed to read sqs event from file %s", filename)
}
if consumer.Handler(
lambdacontext.NewContext(
context.Background(),
&lambdacontext.LambdaContext{
AwsRequestID: uuid.New().String(),
InvokedFunctionArn: strings.TrimSuffix(path.Base(filename), ".json"),
// Identity CognitoIdentity
// ClientContext ClientContext
},
),
event,
); err != nil {
return errors.Wrapf(err, "failed to process event from file %s", filename)
}
return nil
}
func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error {
//todo: create context with logger
rand.Seed(time.Now().Unix())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment