Skip to content
Snippets Groups Projects
Select Git revision
  • 4cc281196a517ec7eac3ec83c124957fd5bf05b1
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

consumer.go

Blame
  • user avatar
    Jan Semmelink authored
    4cc28119
    History
    consumer.go 6.45 KiB
    package sqs
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"math/rand"
    	"os"
    	"path"
    	"reflect"
    	"strings"
    	"time"
    
    	"github.com/aws/aws-lambda-go/events"
    	"github.com/aws/aws-lambda-go/lambda"
    	"github.com/aws/aws-lambda-go/lambdacontext"
    	"github.com/google/uuid"
    	"gitlab.com/uafrica/go-utils/audit"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/queues"
    	"gitlab.com/uafrica/go-utils/service"
    )
    
    func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queues.Consumer {
    	env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod
    	if env == "" {
    		env = "dev"
    	}
    	router, err := queues.NewRouter(routes)
    	if err != nil {
    		panic(fmt.Sprintf("cannot create router: %+v", err))
    	}
    
    	//legacy message type - when running SQS instance for one type of messages only
    	//when defined, make sure handler exists for this type
    	sqsMessageType := os.Getenv("SQS_MESSAGE_TYPE")
    	if sqsMessageType != "" {
    		if _, err := router.Route(sqsMessageType); err != nil {
    			panic(errors.Errorf("No route defined for SQS_MESSAGE_TYPE=\"%s\"", sqsMessageType))
    		}
    	}
    
    	return consumer{
    		Service:             service.New(),
    		env:                 env,
    		router:              router,
    		requestIDHeaderKey:  requestIDHeaderKey,
    		ConstantMessageType: sqsMessageType,
    		producer:            NewProducer(requestIDHeaderKey),
    		checks:              map[string]queues.ICheck{},
    	}
    }
    
    type consumer struct {
    	service.Service
    	env                 string
    	router              queues.Router
    	requestIDHeaderKey  string
    	ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE")
    	producer            service.Producer
    	checks              map[string]queues.ICheck
    }
    
    //wrap Service.WithStarter to return cron, else cannot be chained
    func (consumer consumer) WithStarter(name string, starter service.IStarter) queues.Consumer {
    	consumer.Service = consumer.Service.WithStarter(name, starter)
    	return consumer
    }
    
    //wrap Service.WithErrorReporter to return api, else cannot be chained
    func (consumer consumer) WithErrorReporter(reporter service.IErrorReporter) queues.Consumer {
    	consumer.Service = consumer.Service.WithErrorReporter(reporter)
    	return consumer
    }
    
    //wrap else cannot be chained
    func (consumer consumer) WithAuditor(auditor audit.Auditor) queues.Consumer {
    	consumer.Service = consumer.Service.WithAuditor(auditor)
    	return consumer
    }
    
    func (consumer consumer) Run() {
    	lambda.Start(consumer.Handler)
    }
    
    func (consumer consumer) ProcessFile(filename string) error {
    	f, err := os.Open(filename)
    	if err != nil {
    		return errors.Wrapf(err, "failed to open queue event file %s", filename)
    	}
    	defer f.Close()
    
    	var event events.SQSEvent
    	if err := json.NewDecoder(f).Decode(&event); err != nil {
    		return errors.Wrapf(err, "failed to read sqs event from file %s", filename)
    	}
    
    	if consumer.Handler(
    		lambdacontext.NewContext(
    			context.Background(),
    			&lambdacontext.LambdaContext{
    				AwsRequestID:       uuid.New().String(),
    				InvokedFunctionArn: strings.TrimSuffix(path.Base(filename), ".json"),
    				// Identity           CognitoIdentity
    				// ClientContext      ClientContext
    			},
    		),
    		event,
    	); err != nil {
    		return errors.Wrapf(err, "failed to process event from file %s", filename)
    	}
    	return nil
    }
    
    func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error {
    	//todo: create context with logger
    	rand.Seed(time.Now().Unix())
    
    	//report handler crashes
    	// if consumer.crashReporter != nil {
    	// 	defer sqs.crashReporter.Catch(ctx)
    	// }
    
    	if consumer.ConstantMessageType != "" {
    		//legacy mode for fixed message type as used in shiplogic
    		//where the whole instance is started for a specific SQS_MESSAGE_TYPE defined in environment
    		handler, err := consumer.router.Route(consumer.ConstantMessageType)
    		if err != nil {
    			return errors.Wrapf(err, "messageType=%s not handled", consumer.ConstantMessageType) //checked on startup - should never get here!!!
    		}
    
    		if msgHandler, ok := handler.(func(events.SQSEvent) error); !ok {
    			return errors.Wrapf(err, "SQS_MESSAGE_TYPE=%s: handler signature %T not supported", consumer.ConstantMessageType, handler)
    		} else {
    			return msgHandler(lambdaEvent)
    		}
    	} else {
    		//support different message types - obtained from the individual event records
    		//process all message records in this event:
    		for messageIndex, message := range lambdaEvent.Records {
    			//get request-id for this message record
    			requestID := ""
    			if requestIDAttr, ok := message.MessageAttributes[consumer.requestIDHeaderKey]; ok {
    				requestID = *requestIDAttr.StringValue
    			}
    
    			messageType := ""
    			if messageTypeAttr, ok := message.MessageAttributes["type"]; !ok || messageTypeAttr.StringValue == nil {
    				consumer.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required
    				continue
    			} else {
    				messageType = *messageTypeAttr.StringValue
    			}
    
    			event := service.Event{
    				//producer:  nil,
    				MessageID:      message.MessageId,
    				QueueName:      "N/A", //not sure how to get queue name from lambda Event... would be good to log it, may be in os.Getenv(???)?
    				TypeName:       messageType,
    				DueTime:        time.Now(),
    				RequestIDValue: requestID,
    				BodyJSON:       message.Body,
    			}
    
    			ctx, err := queues.NewContext(consumer.Service, event)
    			if err != nil {
    				return err
    			}
    
    			ctx.WithFields(map[string]interface{}{
    				"message_index": messageIndex,
    				"message":       message,
    			}).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event())
    
    			//routing on messageType
    			sqsHandler, err := consumer.router.Route(messageType)
    			if err != nil {
    				ctx.Errorf("Unhandled sqs messageType(%v): %v", messageType, err)
    				continue
    			}
    			handler, ok := sqsHandler.(queues.Handler)
    			if !ok {
    				ctx.Errorf("messageType(%v) unsupported signature: %T", messageType, sqsHandler)
    				continue
    			}
    
    			args := []reflect.Value{
    				reflect.ValueOf(ctx),
    			}
    
    			//allocate, populate and validate request struct
    			var recordStruct interface{}
    			recordStruct, err = ctx.GetRecord(handler.RecordType)
    			if err != nil {
    				ctx.Errorf("invalid message: %+v", err)
    				continue
    			}
    
    			ctx.Debugf("message (%T) %+v", recordStruct, recordStruct)
    			args = append(args, reflect.ValueOf(recordStruct))
    
    			results := handler.FuncValue.Call(args)
    			if len(results) > 0 && !results[0].IsNil() {
    				ctx.Errorf("handler failed: %+v", results[0].Interface().(error))
    				consumer.Service.ReportError(ctx.Data(), err)
    			}
    		}
    	}
    	return nil
    }