package mem import ( "context" "encoding/json" "fmt" "math/rand" "os" "reflect" "sync" "time" "github.com/google/uuid" "github.com/uptrace/bun" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) func NewConsumer(routes map[string]interface{}) *Consumer { router, err := queues.NewRouter(routes) if err != nil { panic(fmt.Sprintf("cannot create router: %+v", err)) } return &Consumer{ ILogger: logger.New().WithFields(map[string]interface{}{"env": "dev"}).NextColor(), router: router, queues: map[string]*queue{}, } } type Consumer struct { sync.Mutex logger.ILogger router queues.Router dbConn service.IDatabaseConnector queues map[string]*queue } func (consumer *Consumer) WithDb(dbConn service.IDatabaseConnector) queues.IConsumer { consumer.dbConn = dbConn return consumer } func (consumer *Consumer) Queue(name string) (*queue, error) { consumer.Lock() defer consumer.Unlock() q, ok := consumer.queues[name] if !ok { q = &queue{ consumer: consumer, name: name, ch: make(chan queues.Event), } go q.run() consumer.queues[name] = q } return q, nil } //do not call this - when using local producer, the consumer is automatically running //for each queue you send to, and processing from q.run() func (consumer *Consumer) Run() { panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER")) } func (consumer *Consumer) ProcessFile(filename string) error { f, err := os.Open(filename) if err != nil { return errors.Wrapf(err, "failed to open queue event file %s", filename) } defer f.Close() var event queues.Event if err := json.NewDecoder(f).Decode(&event); err != nil { return errors.Wrapf(err, "failed to read queues.Event from file %s", filename) } q := queue{ consumer: consumer, name: "NoName", ch: nil, } if q.process( event, ); err != nil { return errors.Wrapf(err, "failed to process event from file %s", filename) } return nil } type queue struct { consumer *Consumer name string ch chan queues.Event } func (q *queue) run() { for event := range q.ch { err := q.process(event) if err != nil { q.consumer.Errorf("processing failed: %+v", err) } } } func (q *queue) process(event queues.Event) error { //todo: create context with logger rand.Seed(time.Now().Unix()) //report handler crashes //defer sqs.crashReporter.Catch(ctx) var db *bun.DB if q.consumer.dbConn != nil { var err error db, err = q.consumer.dbConn.Connect() if err != nil { return errors.Wrapf(err, "failed to connect to db") } } baseCtx := context.Background() ctx := queues.Context{ Context: service.NewContext(baseCtx, map[string]interface{}{ "env": "dev", "request_id": event.RequestID, "message_type": event.TypeName, }), IProducer: q, //todo: q can only send back into this queue... may need to send to other queues! Event: event, RequestID: event.RequestIDValue, DB: db, } ctx.WithFields(map[string]interface{}{ "params": event.ParamValues, "body": event.BodyJSON, }).Infof("Queue(%s) Recv SQS Event: %v", q.name, event) //routing on messageType sqsHandler, err := q.consumer.router.Route(event.TypeName) if err != nil { return errors.Wrapf(err, "unhandled event type(%v)", event.TypeName) } handler, ok := sqsHandler.(queues.Handler) if !ok { return errors.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler) } args := []reflect.Value{ reflect.ValueOf(ctx), } //allocate, populate and validate request struct var recordStruct interface{} recordStruct, err = ctx.GetRecord(handler.RecordType) if err != nil { return errors.Wrapf(err, "invalid message body") } ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) if len(results) > 0 && !results[0].IsNil() { return errors.Wrapf(results[0].Interface().(error), "handler failed") } ctx.Debugf("handler success") return nil } //queue.process() func (q *queue) Send(event queues.Event) (msgID string, err error) { event.MessageID = uuid.New().String() q.ch <- event return event.MessageID, nil }