diff --git a/cron/context.go b/cron/context.go new file mode 100644 index 0000000000000000000000000000000000000000..734dbcd220e9593e53c9ff63c100146a1bf360d3 --- /dev/null +++ b/cron/context.go @@ -0,0 +1,23 @@ +package cron + +import ( + "context" + "time" + + "github.com/uptrace/bun" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/service" +) + +type IContext interface { + context.Context + logger.ILogger + StartTime() time.Time + MillisecondsSinceStart() int64 +} + +type Context struct { + service.Context + Name string //cron function name + DB *bun.DB +} diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 0000000000000000000000000000000000000000..19f5eb9922e81ff75c2a769306a583eeb82e0759 --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,93 @@ +package cron + +import ( + "context" + "fmt" + "os" + + "github.com/aws/aws-lambda-go/lambda" + "github.com/aws/aws-lambda-go/lambdacontext" + "github.com/google/uuid" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/service" +) + +func New(functions map[string]func(Context) error) Cron { + env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod + if env == "" { + env = "dev" + } + + router, err := NewRouter(functions) + if err != nil { + panic(fmt.Sprintf("cannot create router: %+v", err)) + } + + return Cron{ + ILogger: logger.New().WithFields(map[string]interface{}{"env": env}), + env: env, + router: router, + crashReporter: defaultCrashReporter{}, + } +} + +type Cron struct { + logger.ILogger + env string + router Router + dbConn service.IDatabaseConnector + crashReporter ICrashReporter +} + +func (cron Cron) WithCrashReported(crashReporter ICrashReporter) Cron { + if crashReporter != nil { + cron.crashReporter = crashReporter + } + return cron +} + +type ICrashReporter interface { + Catch(ctx Context) //Report(method string, path string, crash interface{}) +} + +type defaultCrashReporter struct{} + +func (defaultCrashReporter) Catch(ctx Context) { + // crash := recover() + // if crash != nil { + // ctx.Errorf("CRASH: (%T) %+v\n", crash, crash) + // } +} + +func (cron Cron) WithDb(dbConn service.IDatabaseConnector) Cron { + cron.dbConn = dbConn + return cron +} + +func (cron Cron) Run(invokeArn *string) { + if invokeArn != nil && *invokeArn != "" { + //just run this handler and terminate - for testing on a terminal + logger.Infof("Invoking ARN=%s locally for testing ...", *invokeArn) + + lambdaContext := lambdacontext.NewContext( + context.Background(), + &lambdacontext.LambdaContext{ + AwsRequestID: uuid.New().String(), + InvokedFunctionArn: *invokeArn, + // Identity CognitoIdentity + // ClientContext ClientContext + }, + ) + err := cron.Handler(lambdaContext) + if err != nil { + panic(errors.Wrapf(err, "cron failed")) + } else { + logger.Debugf("cron success") + } + return + } + + //production + lambda.Start(cron.Handler) +} diff --git a/cron/handler.go b/cron/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..dd3772bbfc261435d79ca0d97437b2d7f47d3939 --- /dev/null +++ b/cron/handler.go @@ -0,0 +1,51 @@ +package cron + +import ( + "reflect" + + "gitlab.com/uafrica/go-utils/errors" +) + +type Handler struct { + RecordType reflect.Type + FuncValue reflect.Value +} + +func NewHandler(fnc interface{}) (Handler, error) { + h := Handler{} + + fncType := reflect.TypeOf(fnc) + if fncType.NumIn() != 2 { + return h, errors.Errorf("takes %d args instead of (Context, Record)", fncType.NumIn()) + } + if fncType.NumOut() != 1 { + return h, errors.Errorf("returns %d results instead of (error)", fncType.NumOut()) + } + + //arg[0] must implement interface sqs.IContext + if _, ok := reflect.New(fncType.In(0)).Interface().(IContext); !ok { + return h, errors.Errorf("first arg %v does not implement sqs.IContext", fncType.In(0)) + } + + //arg[1] must be a struct for the message record body. It may be an empty struct, but + //all public fields require a json tag which we will use to math the URL param name + if err := validateStructType(fncType.In(1)); err != nil { + return h, errors.Errorf("second arg %v is not valid record struct type", fncType.In(1)) + } + h.RecordType = fncType.In(1) + + //result must be error + if _, ok := reflect.New(fncType.Out(0)).Interface().(*error); !ok { + return h, errors.Errorf("result %v is not error type", fncType.Out(0)) + } + + h.FuncValue = reflect.ValueOf(fnc) + return h, nil +} + +func validateStructType(t reflect.Type) error { + if t.Kind() != reflect.Struct { + return errors.Errorf("%v is %v, not a struct", t, t.Kind()) + } + return nil +} diff --git a/cron/lambda.go b/cron/lambda.go new file mode 100644 index 0000000000000000000000000000000000000000..0d46d6ade3dcce502fce21857c99cedb1fa24263 --- /dev/null +++ b/cron/lambda.go @@ -0,0 +1,46 @@ +package cron + +import ( + "context" + "math/rand" + "time" + + "github.com/aws/aws-lambda-go/lambdacontext" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/service" +) + +type LambdaCronHandler func(lambdaCtx context.Context) error + +func (cron Cron) Handler(lambdaCtx context.Context) error { + lc, _ := lambdacontext.FromContext(lambdaCtx) + requestID := lc.AwsRequestID + + cronName, cronFunc := cron.router.Route(lc.InvokedFunctionArn) + if cronFunc == nil { + return errors.Errorf("request-id:%s unknown cron function(%s)", lc.InvokedFunctionArn) + } + + //got a handler, prepare to run: + rand.Seed(time.Now().Unix()) + + ctx := Context{ + Context: service.NewContext(lambdaCtx, map[string]interface{}{ + "env": cron.env, + "request-id": requestID, + "cron": cronName, + }), + Name: cronName, + } + + //report handler crashes + defer cron.crashReporter.Catch(ctx) + + //todo: set log level, trigger log on conditions, sync at end of transaction - after log level was determined + ctx.Infof("Start CRON Handler") + + if err := cronFunc(ctx); err != nil { + return errors.Wrapf(err, "Cron(%s) failed", cronName) + } + return nil +} diff --git a/cron/router.go b/cron/router.go new file mode 100644 index 0000000000000000000000000000000000000000..83be727cd1c53c32126bdb5a5c575edd013b5e39 --- /dev/null +++ b/cron/router.go @@ -0,0 +1,62 @@ +package cron + +import ( + "fmt" + "strings" + + "gitlab.com/uafrica/go-utils/errors" +) + +type Router struct { + endpoints map[string]func(Context) error +} + +func (r Router) Endpoints() map[string]func(Context) error { + return r.endpoints +} + +func (r Router) Route(arn string) (string, func(Context) error) { + for name, hdlr := range r.endpoints { + if strings.Contains(arn, name) { + return name, hdlr + } + } + return "", nil +} + +//check that all endpoints are correctly defined using one of the supported handler types +//return updated endpoints with additional information +func NewRouter(endpoints map[string]func(Context) error) (Router, error) { + countLegacyEvent := 0 + countLegacyMessage := 0 + countHandler := 0 + for messageType, handlerFunc := range endpoints { + if messageType == "" { + return Router{}, errors.Errorf("blank messageType") + } + if messageType == "/sqs-docs" { + return Router{}, errors.Errorf("%s may not be a defined endpoint - it is reserved", messageType) + } + if handlerFunc == nil { + return Router{}, errors.Errorf("nil handler on %s", messageType) + } + fmt.Printf("%30.30s: OK\n", messageType) + } + fmt.Printf("Checked %d legacy event and %d legacy message and %d new handlers\n", countLegacyEvent, countLegacyMessage, countHandler) + + //add reserved endpoint to generate documentation + r := Router{ + endpoints: endpoints, + } + + // { + // docsHandler, err := NewHandler(GETApiDocs(r)) //endpoints)) + // if err != nil { + // return Router{}, errors.Wrapf(err, "failed to define handler for docs") + // } + // endpoints["/api-docs"] = map[string]interface{}{ + // "GET": docsHandler, + // } + // } + return r, nil +}