Skip to content
Snippets Groups Projects
Select Git revision
  • ba65707801e45f7789bba212b18a9223847c6073
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

consumer.go

Blame
  • consumer.go 4.51 KiB
    package mem
    
    import (
    	"encoding/json"
    	"fmt"
    	"math/rand"
    	"os"
    	"reflect"
    	"sync"
    	"time"
    
    	"github.com/google/uuid"
    	"gitlab.com/uafrica/go-utils/audit"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/queues"
    	"gitlab.com/uafrica/go-utils/service"
    )
    
    func NewConsumer(routes map[string]interface{}) *consumer {
    	router, err := queues.NewRouter(routes)
    	if err != nil {
    		panic(fmt.Sprintf("cannot create router: %+v", err))
    	}
    	//l := logger.New().WithFields(map[string]interface{}{"env": "dev"})
    	//l.IFormatter = l.IFormatter.NextColor()
    	return &consumer{
    		Service: service.New(),
    		router:  router,
    		queues:  map[string]*queue{},
    	}
    }
    
    type consumer struct {
    	sync.Mutex
    	service.Service
    	router queues.Router
    	queues map[string]*queue
    }
    
    //wrap Service.WithStarter to return cron, else cannot be chained
    func (consumer *consumer) WithStarter(name string, starter service.IStarter) queues.Consumer {
    	consumer.Service = consumer.Service.WithStarter(name, starter)
    	return consumer
    }
    
    //wrap Service.WithErrorReporter to return api, else cannot be chained
    func (consumer *consumer) WithErrorReporter(reporter service.IErrorReporter) queues.Consumer {
    	consumer.Service = consumer.Service.WithErrorReporter(reporter)
    	return consumer
    }
    
    //wrap else cannot be chained
    func (consumer *consumer) WithAuditor(auditor audit.Auditor) queues.Consumer {
    	consumer.Service = consumer.Service.WithAuditor(auditor)
    	return consumer
    }
    
    func (consumer *consumer) Queue(name string) (*queue, error) {
    	consumer.Lock()
    	defer consumer.Unlock()
    	q, ok := consumer.queues[name]
    	if !ok {
    		q = &queue{
    			consumer: consumer,
    			name:     name,
    			ch:       make(chan service.Event),
    		}
    		go q.run()
    		consumer.queues[name] = q
    	}
    	return q, nil
    }
    
    //do not call this - when using local producer, the consumer is automatically running
    //for each queue you send to, and processing from q.run()
    func (consumer *consumer) Run() {
    	panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER"))
    }
    
    func (consumer *consumer) ProcessFile(filename string) error {
    	f, err := os.Open(filename)
    	if err != nil {
    		return errors.Wrapf(err, "failed to open queue event file %s", filename)
    	}
    	defer f.Close()
    
    	var event service.Event
    	if err := json.NewDecoder(f).Decode(&event); err != nil {
    		return errors.Wrapf(err, "failed to read service.Event from file %s", filename)
    	}
    
    	q := queue{
    		consumer: consumer,
    		name:     "NoName",
    		ch:       nil,
    	}
    
    	if q.process(
    		event,
    	); err != nil {
    		return errors.Wrapf(err, "failed to process event from file %s", filename)
    	}
    	return nil
    }
    
    type queue struct {
    	consumer *consumer
    	name     string
    	ch       chan service.Event
    }
    
    func (q *queue) run() {
    	for event := range q.ch {
    		err := q.process(event)
    		if err != nil {
    			q.consumer.Errorf("processing failed: %+v", err)
    		}
    	}
    }
    
    func (q *queue) process(event service.Event) error {
    	//todo: create context with logger
    	rand.Seed(time.Now().Unix())
    
    	//report handler crashes
    	// if q.crashReporter != nil {
    	// 	defer q.crashReporter.Catch(ctx)
    	// }
    	ctx, err := queues.NewContext(q.consumer.Service, event)
    	if err != nil {
    		return err
    	}
    
    	//routing on messageType
    	sqsHandler, err := q.consumer.router.Route(event.TypeName)
    	if err != nil {
    		return errors.Wrapf(err, "unhandled event type(%v)", event.TypeName)
    	}
    	handler, ok := sqsHandler.(queues.Handler)
    	if !ok {
    		return errors.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler)
    	}
    
    	args := []reflect.Value{
    		reflect.ValueOf(ctx),
    	}
    
    	//allocate, populate and validate request struct
    	var recordStruct interface{}
    	recordStruct, err = ctx.GetRecord(handler.RecordType)
    	if err != nil {
    		return errors.Wrapf(err, "invalid message body")
    	}
    
    	ctx.WithFields(map[string]interface{}{
    		"params": event.ParamValues,
    		"body":   event.BodyJSON,
    	}).Infof("RECV(%s) Queue(%s).Type(%s).Due(%s): (%T)%v",
    		"---", //not yet available here - not part of event, and in SQS I think it is passed in SQS layer, so need to extend local channel to include this along with event
    		q.name,
    		event.TypeName,
    		event.DueTime,
    		recordStruct,
    		recordStruct)
    
    	ctx.Debugf("message (%T) %+v", recordStruct, recordStruct)
    	args = append(args, reflect.ValueOf(recordStruct))
    
    	results := handler.FuncValue.Call(args)
    	if len(results) > 0 && !results[0].IsNil() {
    		return errors.Wrapf(results[0].Interface().(error), "handler failed")
    	}
    	ctx.Debugf("Handler done")
    	return nil
    } //queue.process()
    
    func (q *queue) Send(event service.Event) (msgID string, err error) {
    	event.MessageID = uuid.New().String()
    	q.ch <- event
    	return event.MessageID, nil
    }