Skip to content
Snippets Groups Projects
Commit 7bdfe7d3 authored by Jan Semmelink's avatar Jan Semmelink
Browse files

Update mem queues to work in local API

parent 78129222
No related branches found
No related tags found
No related merge requests found
......@@ -5,13 +5,10 @@ import (
"net/http"
"os"
"runtime/debug"
"sync"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"gitlab.com/uafrica/go-utils/audit"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
queues_mem "gitlab.com/uafrica/go-utils/queues/mem"
queues_sqs "gitlab.com/uafrica/go-utils/queues/sqs"
"gitlab.com/uafrica/go-utils/service"
......@@ -151,24 +148,8 @@ func (api Api) Run() {
if (api.localPort > 0 || os.Getenv("LOG_LEVEL") == "debug") && api.localQueueEventHandlers != nil {
//use in-memory channels for async events
api.Debugf("Using in-memory channels for async events ...")
memConsumer := queues_mem.NewConsumer(api.localQueueEventHandlers)
memConsumer := queues_mem.NewConsumer(api.Service, api.localQueueEventHandlers)
api = api.WithProducer(queues_mem.NewProducer(memConsumer))
sqsEventChan := make(chan events.SQSEvent)
sqsWaitGroup := sync.WaitGroup{}
sqsWaitGroup.Add(1)
go func() {
for event := range sqsEventChan {
logger.Debugf("NOT YET PROCESSING SQS Event: %+v", event)
}
sqsWaitGroup.Done()
}()
//when we terminate, close the sqs chan and wait for it to complete processing
defer func() {
close(sqsEventChan)
sqsWaitGroup.Wait()
}()
} else {
//use SQS for async events
api.Debugf("Using SQS queue producer for async events ...")
......
......@@ -16,24 +16,33 @@ import (
"gitlab.com/uafrica/go-utils/service"
)
func NewConsumer(routes map[string]interface{}) *consumer {
func NewConsumer(s service.Service, routes map[string]interface{}) queues.Consumer {
if s == nil {
panic("NewConsumer(service==nil)")
}
router, err := queues.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
//l := logger.New().WithFields(map[string]interface{}{"env": "dev"})
//l.IFormatter = l.IFormatter.NextColor()
return &consumer{
Service: service.New(),
c := &consumer{
Service: s,
router: router,
queues: map[string]*queue{},
}
//create a producer that will produce into this consumer
c.producer = &producer{
consumer: c,
}
c.Service = c.Service.WithProducer(c.producer)
return c
}
type consumer struct {
sync.Mutex
service.Service
router queues.Router
producer *producer
queues map[string]*queue
}
......
......@@ -2,17 +2,21 @@ package mem
import (
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
//can only produce locally if also consuming local
func NewProducer(consumer *consumer) service.Producer {
if consumer == nil {
func NewProducer(memConsumer queues.Consumer) service.Producer {
if memConsumer == nil {
panic(errors.Errorf("cannot product locally without consumer"))
}
return &producer{
consumer: consumer,
mc, ok := memConsumer.(*consumer)
if !ok {
panic(errors.Errorf("NewProducer(%T) is not a mem consumer", memConsumer))
}
return mc.producer
}
type producer struct {
......@@ -20,6 +24,7 @@ type producer struct {
}
func (producer *producer) Send(event service.Event) (string, error) {
logger.Debugf("MEM producer.queue(%s) Sending event %+v", event.QueueName, event)
q, err := producer.consumer.Queue(event.QueueName)
if err != nil {
return "", errors.Wrapf(err, "failed to get/create queue(%s)", event.QueueName)
......
......@@ -22,12 +22,13 @@ func NewProducer(requestIDHeaderKey string) service.Producer {
if requestIDHeaderKey == "" {
requestIDHeaderKey = "request-id"
}
return &producer{
p := &producer{
region: region,
requestIDHeaderKey: requestIDHeaderKey,
session: nil,
queues: map[string]*QueueProducer{},
}
return p
}
type producer struct {
......@@ -40,7 +41,7 @@ type producer struct {
// Note: Calling code needs SQS IAM permissions
func (producer *producer) Send(event service.Event) (string, error) {
logger.Debugf("producer=%T=%v", producer, producer)
logger.Debugf("SQS producer.Send(%+v)", event)
messenger, ok := producer.queues[event.QueueName]
if !ok {
producer.Lock()
......@@ -89,7 +90,7 @@ type QueueProducer struct {
}
func (m *QueueProducer) Send(event service.Event) (string, error) {
//logger.Debugf("Sending event %+v", event)
logger.Debugf("SQS producer.queue(%s) Sending event %+v", m.queueURL, event)
//add params as message attributes
msgAttrs := make(map[string]*sqs.MessageAttributeValue)
......
......@@ -13,6 +13,7 @@ import (
type Service interface {
logger.Logger
IErrorReporter
Producer
audit.Auditor
WithStarter(name string, starter IStarter) Service
WithProducer(producer Producer) Service
......@@ -79,7 +80,6 @@ func (s service) WithStarter(name string, starter IStarter) Service {
func (s service) WithProducer(producer Producer) Service {
if producer != nil {
s.Producer = producer
s.Infof("Producer = (%T)%v", producer, producer)
}
return s
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment