diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go index edc0dc8582846763f2ed2cb930960d4face60468..e4b1756e537854e476441d5c408a39f8006aec86 100644 --- a/queues/mem/consumer.go +++ b/queues/mem/consumer.go @@ -119,12 +119,20 @@ type queue struct { } func (q *queue) run() { + // logger.Debugf("Q(%s) Start", q.name) for event := range q.ch { - err := q.process(event) - if err != nil { - q.consumer.Errorf("processing failed: %+v", err) - } - } + //process in background because some event processing sends to itself then wait for some responses on new events on the same queue!!! + go func() { + // logger.Debugf("Q(%s) process start: %+v", q.name, event) + err := q.process(event) + if err != nil { + q.consumer.Errorf("Q(%s) process failed: %+v", q.name, err) + // } else { + // q.consumer.Debugf("Q(%s) process success: %+v", q.name, err) + } + }() + } + // logger.Debugf("Q(%s) STOPPED", q.name) } func (q *queue) process(event service.Event) error { diff --git a/queues/mem/producer.go b/queues/mem/producer.go index 5b289fbb0b9c30a29d2d98ac2e246ce763906fd5..b32c4c44057d4e135c1e1c741ac94d8da2266237 100644 --- a/queues/mem/producer.go +++ b/queues/mem/producer.go @@ -35,5 +35,6 @@ func (producer *producer) Send(event service.Event) (string, error) { return "", errors.Wrapf(err, "failed to send to queue(%s)", event.QueueName) } + logger.Debugf("MEM producer.queue(%s) SENT event %+v", event.QueueName, event) return msgID, nil }