Select Git revision
consumer.go
consumer.go 4.54 KiB
package mem
import (
"encoding/json"
"fmt"
"math/rand"
"os"
"reflect"
"sync"
"time"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/audit"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
func NewConsumer(routes map[string]interface{}) *consumer {
router, err := queues.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
//l := logger.New().WithFields(map[string]interface{}{"env": "dev"})
//l.IFormatter = l.IFormatter.NextColor()
return &consumer{
Service: service.New(),
//Logger: l,
router: router,
queues: map[string]*queue{},
}
}
type consumer struct {
sync.Mutex
service.Service
//logger.Logger
router queues.Router
queues map[string]*queue
}
//wrap Service.WithStarter to return cron, else cannot be chained
func (consumer *consumer) WithStarter(name string, starter service.IStarter) queues.Consumer {
consumer.Service = consumer.Service.WithStarter(name, starter)
return consumer
}
//wrap Service.WithErrorReporter to return api, else cannot be chained
func (consumer *consumer) WithErrorReporter(reporter service.IErrorReporter) queues.Consumer {
consumer.Service = consumer.Service.WithErrorReporter(reporter)
return consumer
}
//wrap else cannot be chained
func (consumer *consumer) WithAuditor(auditor audit.Auditor) queues.Consumer {
consumer.Service = consumer.Service.WithAuditor(auditor)
return consumer
}
func (consumer *consumer) Queue(name string) (*queue, error) {
consumer.Lock()
defer consumer.Unlock()
q, ok := consumer.queues[name]
if !ok {
q = &queue{
consumer: consumer,
name: name,
ch: make(chan service.Event),
}
go q.run()