package sqs import ( "context" "encoding/json" "fmt" "math/rand" "os" "path" "reflect" "strings" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" "github.com/google/uuid" "github.com/uptrace/bun" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queues.IConsumer { env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod if env == "" { env = "dev" } router, err := queues.NewRouter(routes) if err != nil { panic(fmt.Sprintf("cannot create router: %+v", err)) } //legacy message type - when running SQS instance for one type of messages only //when defined, make sure handler exists for this type sqsMessageType := os.Getenv("SQS_MESSAGE_TYPE") if sqsMessageType != "" { if _, err := router.Route(sqsMessageType); err != nil { panic(errors.Errorf("No route defined for SQS_MESSAGE_TYPE=\"%s\"", sqsMessageType)) } } return consumer{ ILogger: logger.New().WithFields(map[string]interface{}{"env": env}), env: env, router: router, requestIDHeaderKey: requestIDHeaderKey, ConstantMessageType: sqsMessageType, producer: NewProducer(requestIDHeaderKey), } } type consumer struct { logger.ILogger //for logging outside of context env string router queues.Router requestIDHeaderKey string ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE") dbConn service.IDatabaseConnector producer queues.IProducer } func (consumer consumer) WithDb(dbConn service.IDatabaseConnector) queues.IConsumer { consumer.dbConn = dbConn return consumer } func (consumer consumer) Run() { lambda.Start(consumer.Handler) } func (consumer consumer) ProcessFile(filename string) error { f, err := os.Open(filename) if err != nil { return errors.Wrapf(err, "failed to open queue event file %s", filename) } defer f.Close() var event events.SQSEvent if err := json.NewDecoder(f).Decode(&event); err != nil { return errors.Wrapf(err, "failed to read sqs event from file %s", filename) } if consumer.Handler( lambdacontext.NewContext( context.Background(), &lambdacontext.LambdaContext{ AwsRequestID: uuid.New().String(), InvokedFunctionArn: strings.TrimSuffix(path.Base(filename), ".json"), // Identity CognitoIdentity // ClientContext ClientContext }, ), event, ); err != nil { return errors.Wrapf(err, "failed to process event from file %s", filename) } return nil } func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error { //todo: create context with logger rand.Seed(time.Now().Unix()) //report handler crashes //defer sqs.crashReporter.Catch(ctx) var db *bun.DB if consumer.dbConn != nil { var err error db, err = consumer.dbConn.Connect() if err != nil { return errors.Wrapf(err, "failed to connect to db") } } if consumer.ConstantMessageType != "" { //legacy mode for fixed message type as used in shiplogic //where the whole instance is started for a specific SQS_MESSAGE_TYPE defined in environment handler, err := consumer.router.Route(consumer.ConstantMessageType) if err != nil { return errors.Wrapf(err, "messageType=%s not handled", consumer.ConstantMessageType) //checked on startup - should never get here!!! } if msgHandler, ok := handler.(func(events.SQSEvent) error); !ok { return errors.Wrapf(err, "SQS_MESSAGE_TYPE=%s: handler signature %T not supported", consumer.ConstantMessageType, handler) } else { return msgHandler(lambdaEvent) } } else { //support different message types - obtained from the individual event records //process all message records in this event: for messageIndex, message := range lambdaEvent.Records { //get request-id for this message record requestID := "" if requestIDAttr, ok := message.MessageAttributes[consumer.requestIDHeaderKey]; ok { requestID = *requestIDAttr.StringValue } messageType := "" if messageTypeAttr, ok := message.MessageAttributes["type"]; !ok || messageTypeAttr.StringValue == nil { consumer.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required continue } else { messageType = *messageTypeAttr.StringValue } ctx := queues.Context{ Context: service.NewContext(baseCtx, map[string]interface{}{ "env": consumer.env, "request_id": requestID, "message_type": messageType, }), IProducer: consumer.producer, //needed so handler can queue other events or requeue this event Event: queues.Event{ //producer: nil, MessageID: message.MessageId, QueueName: "N/A", //not sure how to get queue name from lambda Event... would be good to log it, may be in os.Getenv(???)? TypeName: messageType, DueTime: time.Now(), RequestIDValue: requestID, BodyJSON: message.Body, }, RequestID: requestID, DB: db, } ctx.WithFields(map[string]interface{}{ "message_index": messageIndex, "message": message, }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event.QueueName, ctx.Event) //routing on messageType sqsHandler, err := consumer.router.Route(messageType) if err != nil { ctx.Errorf("Unhandled sqs messageType(%v): %v", messageType, err) continue } handler, ok := sqsHandler.(queues.Handler) if !ok { ctx.Errorf("messageType(%v) unsupported signature: %T", messageType, sqsHandler) continue } args := []reflect.Value{ reflect.ValueOf(ctx), } //allocate, populate and validate request struct var recordStruct interface{} recordStruct, err = ctx.GetRecord(handler.RecordType) if err != nil { ctx.Errorf("invalid message: %+v", err) continue } ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) if len(results) > 0 && !results[0].IsNil() { ctx.Errorf("handler failed: %+v", results[0].Interface().(error)) } } } return nil }