From 745750a44bac2d280523cd068ae6f9ea83b8a598 Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Fri, 15 Oct 2021 09:07:52 +0200 Subject: [PATCH] Process local queue events concurrently on the same queue because some of them queue more events on the same queue and then seem to wait for them --- queues/mem/consumer.go | 18 +++++++++++++----- queues/mem/producer.go | 1 + 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go index edc0dc8..e4b1756 100644 --- a/queues/mem/consumer.go +++ b/queues/mem/consumer.go @@ -119,12 +119,20 @@ type queue struct { } func (q *queue) run() { + // logger.Debugf("Q(%s) Start", q.name) for event := range q.ch { - err := q.process(event) - if err != nil { - q.consumer.Errorf("processing failed: %+v", err) - } - } + //process in background because some event processing sends to itself then wait for some responses on new events on the same queue!!! + go func() { + // logger.Debugf("Q(%s) process start: %+v", q.name, event) + err := q.process(event) + if err != nil { + q.consumer.Errorf("Q(%s) process failed: %+v", q.name, err) + // } else { + // q.consumer.Debugf("Q(%s) process success: %+v", q.name, err) + } + }() + } + // logger.Debugf("Q(%s) STOPPED", q.name) } func (q *queue) process(event service.Event) error { diff --git a/queues/mem/producer.go b/queues/mem/producer.go index 5b289fb..b32c4c4 100644 --- a/queues/mem/producer.go +++ b/queues/mem/producer.go @@ -35,5 +35,6 @@ func (producer *producer) Send(event service.Event) (string, error) { return "", errors.Wrapf(err, "failed to send to queue(%s)", event.QueueName) } + logger.Debugf("MEM producer.queue(%s) SENT event %+v", event.QueueName, event) return msgID, nil } -- GitLab