Select Git revision
consumer.go
consumer.go 4.74 KiB
package mem_consumer
import (
"encoding/json"
"fmt"
"math/rand"
"os"
"reflect"
"sync"
"time"
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/consumer"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
func New(s service.Service, routes map[string]interface{}) consumer.Consumer {
if s == nil {
panic("NewConsumer(service==nil)")
}
router, err := consumer.NewRouter(routes)
if err != nil {
panic(fmt.Sprintf("cannot create router: %+v", err))
}
c := &memConsumer{
Service: s,
router: router,
queues: map[string]*queue{},
}
//create a producer that will produce into this consumer
c.producer = &memProducer{
consumer: c,
}
c.Service = c.Service.WithProducer(c.producer)
return c
}
type memConsumer struct {
sync.Mutex
service.Service
router consumer.Router
producer queues.Producer
queues map[string]*queue
}
//wrap Service.WithStarter to return cron, else cannot be chained
func (consumer *memConsumer) WithStarter(name string, starter service.Starter) consumer.Consumer {
consumer.Service = consumer.Service.WithStarter(name, starter)
return consumer
}
func (consumer *memConsumer) Queue(name string) (*queue, error) {
consumer.Lock()
defer consumer.Unlock()
q, ok := consumer.queues[name]
if !ok {
q = &queue{
consumer: consumer,
name: name,
ch: make(chan queues.Event),
}
go q.run()
consumer.queues[name] = q
}
return q, nil
}