Select Git revision
consumer.go
consumer.go 6.52 KiB
package sqs
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path"
"reflect"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/audit"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queues.Consumer {
env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod
if env == "" {
env = "dev"
}
router, err := queues.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
//legacy message type - when running SQS instance for one type of messages only
//when defined, make sure handler exists for this type
sqsMessageType := os.Getenv("SQS_MESSAGE_TYPE")
if sqsMessageType != "" {
if _, err := router.Route(sqsMessageType); err != nil {
panic(errors.Errorf("No route defined for SQS_MESSAGE_TYPE=\"%s\"", sqsMessageType))
}
}
return consumer{
Service: service.New(),
//Logger: logger.New().WithFields(map[string]interface{}{"env": env}),
env: env,
router: router,
requestIDHeaderKey: requestIDHeaderKey,
ConstantMessageType: sqsMessageType,
producer: NewProducer(requestIDHeaderKey),
checks: map[string]queues.ICheck{},
}
}
type consumer struct {
service.Service
env string
router queues.Router
requestIDHeaderKey string
ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE")
producer service.Producer
checks map[string]queues.ICheck
}
//wrap Service.WithStarter to return cron, else cannot be chained
func (consumer consumer) WithStarter(name string, starter service.IStarter) queues.Consumer {
consumer.Service = consumer.Service.WithStarter(name, starter)
return consumer
}