Select Git revision
consumer.go
consumer.go 6.23 KiB
package sqs_consumer
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path"
"reflect"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/audit"
"gitlab.com/uafrica/go-utils/consumer"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logs"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/queues/sqs_producer"
"gitlab.com/uafrica/go-utils/service"
)
func New(requestIDHeaderKey string, routes map[string]interface{}) consumer.Consumer {
env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod
if env == "" {
env = "dev"
}
router, err := consumer.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
//legacy message type - when running SQS instance for one type of messages only
//when defined, make sure handler exists for this type
sqsMessageType := os.Getenv("SQS_MESSAGE_TYPE")
if sqsMessageType != "" {
if _, err := router.Route(sqsMessageType); err != nil {
panic(errors.Errorf("No route defined for SQS_MESSAGE_TYPE=\"%s\"", sqsMessageType))
}
}
producer := sqs_producer.New(requestIDHeaderKey)
s := service.New().
WithProducer(producer)
audit.Init(producer)
logs.Init(producer)
return sqsConsumer{
Service: s,
env: env,
router: router,
requestIDHeaderKey: requestIDHeaderKey,
ConstantMessageType: sqsMessageType,
checks: map[string]consumer.Checker{},
}
}
type sqsConsumer struct {
service.Service
env string
router consumer.Router
requestIDHeaderKey string
ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE")
checks map[string]consumer.Checker
}