Select Git revision
consumer.go
consumer.go 4.51 KiB
package mem
import (
"encoding/json"
"fmt"
"math/rand"
"os"
"reflect"
"sync"
"time"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/audit"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
func NewConsumer(routes map[string]interface{}) *consumer {
router, err := queues.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
//l := logger.New().WithFields(map[string]interface{}{"env": "dev"})
//l.IFormatter = l.IFormatter.NextColor()
return &consumer{
Service: service.New(),
router: router,
queues: map[string]*queue{},
}
}
type consumer struct {
sync.Mutex
service.Service
router queues.Router
queues map[string]*queue
}
//wrap Service.WithStarter to return cron, else cannot be chained
func (consumer *consumer) WithStarter(name string, starter service.IStarter) queues.Consumer {
consumer.Service = consumer.Service.WithStarter(name, starter)
return consumer
}
//wrap Service.WithErrorReporter to return api, else cannot be chained
func (consumer *consumer) WithErrorReporter(reporter service.IErrorReporter) queues.Consumer {
consumer.Service = consumer.Service.WithErrorReporter(reporter)
return consumer
}
//wrap else cannot be chained
func (consumer *consumer) WithAuditor(auditor audit.Auditor) queues.Consumer {
consumer.Service = consumer.Service.WithAuditor(auditor)
return consumer
}
func (consumer *consumer) Queue(name string) (*queue, error) {
consumer.Lock()
defer consumer.Unlock()
q, ok := consumer.queues[name]
if !ok {
q = &queue{
consumer: consumer,
name: name,
ch: make(chan service.Event),
}
go q.run()
consumer.queues[name] = q
}
return q, nil
}
//do not call this - when using local producer, the consumer is automatically running
//for each queue you send to, and processing from q.run()
func (consumer *consumer) Run() {
panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER"))
}
func (consumer *consumer) ProcessFile(filename string) error {
f, err := os.Open(filename)
if err != nil {
return errors.Wrapf(err, "failed to open queue event file %s", filename)
}
defer f.Close()
var event service.Event
if err := json.NewDecoder(f).Decode(&event); err != nil {
return errors.Wrapf(err, "failed to read service.Event from file %s", filename)
}
q := queue{
consumer: consumer,
name: "NoName",
ch: nil,
}
if q.process(
event,
); err != nil {
return errors.Wrapf(err, "failed to process event from file %s", filename)
}
return nil
}
type queue struct {
consumer *consumer
name string
ch chan service.Event
}
func (q *queue) run() {
for event := range q.ch {
err := q.process(event)
if err != nil {
q.consumer.Errorf("processing failed: %+v", err)
}
}
}
func (q *queue) process(event service.Event) error {
//todo: create context with logger
rand.Seed(time.Now().Unix())
//report handler crashes
// if q.crashReporter != nil {
// defer q.crashReporter.Catch(ctx)
// }
ctx, err := queues.NewContext(q.consumer.Service, event)
if err != nil {
return err
}
//routing on messageType
sqsHandler, err := q.consumer.router.Route(event.TypeName)
if err != nil {
return errors.Wrapf(err, "unhandled event type(%v)", event.TypeName)
}
handler, ok := sqsHandler.(queues.Handler)
if !ok {
return errors.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler)
}
args := []reflect.Value{
reflect.ValueOf(ctx),
}
//allocate, populate and validate request struct
var recordStruct interface{}
recordStruct, err = ctx.GetRecord(handler.RecordType)
if err != nil {
return errors.Wrapf(err, "invalid message body")
}
ctx.WithFields(map[string]interface{}{
"params": event.ParamValues,
"body": event.BodyJSON,
}).Infof("RECV(%s) Queue(%s).Type(%s).Due(%s): (%T)%v",
"---", //not yet available here - not part of event, and in SQS I think it is passed in SQS layer, so need to extend local channel to include this along with event
q.name,
event.TypeName,
event.DueTime,
recordStruct,
recordStruct)
ctx.Debugf("message (%T) %+v", recordStruct, recordStruct)
args = append(args, reflect.ValueOf(recordStruct))
results := handler.FuncValue.Call(args)
if len(results) > 0 && !results[0].IsNil() {
return errors.Wrapf(results[0].Interface().(error), "handler failed")
}
ctx.Debugf("Handler done")
return nil
} //queue.process()
func (q *queue) Send(event service.Event) (msgID string, err error) {
event.MessageID = uuid.New().String()
q.ch <- event
return event.MessageID, nil
}