Select Git revision
consumer.go
consumer.go 6.45 KiB
package sqs
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path"
"reflect"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/audit"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queues.Consumer {
env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod
if env == "" {
env = "dev"
}
router, err := queues.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
//legacy message type - when running SQS instance for one type of messages only
//when defined, make sure handler exists for this type
sqsMessageType := os.Getenv("SQS_MESSAGE_TYPE")
if sqsMessageType != "" {
if _, err := router.Route(sqsMessageType); err != nil {
panic(errors.Errorf("No route defined for SQS_MESSAGE_TYPE=\"%s\"", sqsMessageType))
}
}
return consumer{
Service: service.New(),
env: env,
router: router,
requestIDHeaderKey: requestIDHeaderKey,
ConstantMessageType: sqsMessageType,
producer: NewProducer(requestIDHeaderKey),
checks: map[string]queues.ICheck{},
}
}
type consumer struct {
service.Service
env string
router queues.Router
requestIDHeaderKey string
ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE")
producer service.Producer
checks map[string]queues.ICheck
}
//wrap Service.WithStarter to return cron, else cannot be chained
func (consumer consumer) WithStarter(name string, starter service.IStarter) queues.Consumer {
consumer.Service = consumer.Service.WithStarter(name, starter)
return consumer
}
//wrap Service.WithErrorReporter to return api, else cannot be chained
func (consumer consumer) WithErrorReporter(reporter service.IErrorReporter) queues.Consumer {
consumer.Service = consumer.Service.WithErrorReporter(reporter)
return consumer
}
//wrap else cannot be chained
func (consumer consumer) WithAuditor(auditor audit.Auditor) queues.Consumer {
consumer.Service = consumer.Service.WithAuditor(auditor)
return consumer
}
func (consumer consumer) Run() {
lambda.Start(consumer.Handler)
}
func (consumer consumer) ProcessFile(filename string) error {
f, err := os.Open(filename)
if err != nil {
return errors.Wrapf(err, "failed to open queue event file %s", filename)
}
defer f.Close()
var event events.SQSEvent
if err := json.NewDecoder(f).Decode(&event); err != nil {
return errors.Wrapf(err, "failed to read sqs event from file %s", filename)
}
if consumer.Handler(
lambdacontext.NewContext(
context.Background(),
&lambdacontext.LambdaContext{
AwsRequestID: uuid.New().String(),
InvokedFunctionArn: strings.TrimSuffix(path.Base(filename), ".json"),
// Identity CognitoIdentity
// ClientContext ClientContext
},
),
event,
); err != nil {
return errors.Wrapf(err, "failed to process event from file %s", filename)
}
return nil
}
func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error {
//todo: create context with logger
rand.Seed(time.Now().Unix())
//report handler crashes
// if consumer.crashReporter != nil {
// defer sqs.crashReporter.Catch(ctx)
// }
if consumer.ConstantMessageType != "" {
//legacy mode for fixed message type as used in shiplogic
//where the whole instance is started for a specific SQS_MESSAGE_TYPE defined in environment
handler, err := consumer.router.Route(consumer.ConstantMessageType)
if err != nil {
return errors.Wrapf(err, "messageType=%s not handled", consumer.ConstantMessageType) //checked on startup - should never get here!!!
}
if msgHandler, ok := handler.(func(events.SQSEvent) error); !ok {
return errors.Wrapf(err, "SQS_MESSAGE_TYPE=%s: handler signature %T not supported", consumer.ConstantMessageType, handler)
} else {
return msgHandler(lambdaEvent)
}
} else {
//support different message types - obtained from the individual event records
//process all message records in this event:
for messageIndex, message := range lambdaEvent.Records {
//get request-id for this message record
requestID := ""
if requestIDAttr, ok := message.MessageAttributes[consumer.requestIDHeaderKey]; ok {
requestID = *requestIDAttr.StringValue
}
messageType := ""
if messageTypeAttr, ok := message.MessageAttributes["type"]; !ok || messageTypeAttr.StringValue == nil {
consumer.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required
continue
} else {
messageType = *messageTypeAttr.StringValue
}
event := service.Event{
//producer: nil,
MessageID: message.MessageId,
QueueName: "N/A", //not sure how to get queue name from lambda Event... would be good to log it, may be in os.Getenv(???)?
TypeName: messageType,
DueTime: time.Now(),
RequestIDValue: requestID,
BodyJSON: message.Body,
}
ctx, err := queues.NewContext(consumer.Service, event)
if err != nil {
return err
}
ctx.WithFields(map[string]interface{}{
"message_index": messageIndex,
"message": message,
}).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event())
//routing on messageType
sqsHandler, err := consumer.router.Route(messageType)
if err != nil {
ctx.Errorf("Unhandled sqs messageType(%v): %v", messageType, err)
continue
}
handler, ok := sqsHandler.(queues.Handler)
if !ok {
ctx.Errorf("messageType(%v) unsupported signature: %T", messageType, sqsHandler)
continue
}
args := []reflect.Value{
reflect.ValueOf(ctx),
}
//allocate, populate and validate request struct
var recordStruct interface{}
recordStruct, err = ctx.GetRecord(handler.RecordType)
if err != nil {
ctx.Errorf("invalid message: %+v", err)
continue
}
ctx.Debugf("message (%T) %+v", recordStruct, recordStruct)
args = append(args, reflect.ValueOf(recordStruct))
results := handler.FuncValue.Call(args)
if len(results) > 0 && !results[0].IsNil() {
ctx.Errorf("handler failed: %+v", results[0].Interface().(error))
consumer.Service.ReportError(ctx.Data(), err)
}
}
}
return nil
}