Skip to content
Snippets Groups Projects
Select Git revision
  • b40201d3d0e55e8244cc9a33480886b7e3f74220
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

consumer.go

Blame
  • consumer.go 4.74 KiB
    package mem_consumer
    
    import (
    	"encoding/json"
    	"fmt"
    	"math/rand"
    	"os"
    	"reflect"
    	"sync"
    	"time"
    
    	"github.com/google/uuid"
    	"gitlab.com/uafrica/go-utils/consumer"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/queues"
    	"gitlab.com/uafrica/go-utils/service"
    )
    
    func New(s service.Service, routes map[string]interface{}) consumer.Consumer {
    	if s == nil {
    		panic("NewConsumer(service==nil)")
    	}
    	router, err := consumer.NewRouter(routes)
    	if err != nil {
    		panic(fmt.Sprintf("cannot create router: %+v", err))
    	}
    	c := &memConsumer{
    		Service: s,
    		router:  router,
    		queues:  map[string]*queue{},
    	}
    
    	//create a producer that will produce into this consumer
    	c.producer = &memProducer{
    		consumer: c,
    	}
    	c.Service = c.Service.WithProducer(c.producer)
    	return c
    }
    
    type memConsumer struct {
    	sync.Mutex
    	service.Service
    	router   consumer.Router
    	producer queues.Producer
    	queues   map[string]*queue
    }
    
    //wrap Service.WithStarter to return cron, else cannot be chained
    func (consumer *memConsumer) WithStarter(name string, starter service.Starter) consumer.Consumer {
    	consumer.Service = consumer.Service.WithStarter(name, starter)
    	return consumer
    }
    
    func (consumer *memConsumer) Queue(name string) (*queue, error) {
    	consumer.Lock()
    	defer consumer.Unlock()
    	q, ok := consumer.queues[name]
    	if !ok {
    		q = &queue{
    			consumer: consumer,
    			name:     name,
    			ch:       make(chan queues.Event),
    		}
    		go q.run()
    		consumer.queues[name] = q
    	}
    	return q, nil
    }