Skip to content

Commit 89c6aee

Browse files
boekkooi-freshbilaljaved
authored andcommitted
Implemented AMPQ listener
In order to use AMQP for event store notification we need to be able to listen to them. This implementation is similar to the pq once meaning it will keep retrying to consume messages until the context is cancelled.
1 parent b1bfa9d commit 89c6aee

File tree

6 files changed

+364
-12
lines changed

6 files changed

+364
-12
lines changed

extension/amqp/amqp.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,20 @@ package amqp
33
import (
44
"io"
55

6+
"github.com/hellofresh/goengine"
67
"github.com/streadway/amqp"
78
)
89

10+
// NotificationChannel represents a channel for notifications
11+
type NotificationChannel interface {
12+
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
13+
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
14+
Qos(prefetchCount, prefetchSize int, global bool) error
15+
}
16+
917
// setup returns a connection and channel to be used for the Queue setup
1018
func setup(url, queue string) (io.Closer, NotificationChannel, error) {
19+
1120
conn, err := amqp.Dial(url)
1221
if err != nil {
1322
return nil, nil, err
@@ -25,7 +34,29 @@ func setup(url, queue string) (io.Closer, NotificationChannel, error) {
2534
return conn, ch, nil
2635
}
2736

28-
// NotificationChannel represents a channel for notifications
29-
type NotificationChannel interface {
30-
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
37+
// DirectQueueConsume returns a Consume func that will connect to the provided AMQP server and create a queue for direct message delivery
38+
func DirectQueueConsume(amqpDSN, queue string) (Consume, error) {
39+
switch {
40+
case len(amqpDSN) == 0:
41+
return nil, goengine.InvalidArgumentError("amqpDSN")
42+
case len(queue) == 0:
43+
return nil, goengine.InvalidArgumentError("queue")
44+
}
45+
46+
return func() (io.Closer, <-chan amqp.Delivery, error) {
47+
conn, ch, err := setup(amqpDSN, queue)
48+
if err != nil {
49+
return nil, nil, err
50+
}
51+
52+
// Indicate we only want 1 message to be acknowledge at a time.
53+
if err := ch.Qos(1, 0, false); err != nil {
54+
return nil, nil, err
55+
}
56+
57+
// Exclusive consumer
58+
deliveries, err := ch.Consume(queue, "", true, false, false, false, nil)
59+
60+
return conn, deliveries, err
61+
}, nil
3162
}

extension/amqp/amqp_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,18 @@ func (ch mockChannel) Publish(
2121
) error {
2222
return nil
2323
}
24+
25+
func (ch mockChannel) Consume(
26+
queue,
27+
consumer string,
28+
autoAck,
29+
exclusive,
30+
noLocal,
31+
noWait bool,
32+
args amqp.Table,
33+
) (<-chan amqp.Delivery, error) {
34+
return make(chan amqp.Delivery), nil
35+
}
36+
func (ch mockChannel) Qos(prefetchCount, prefetchSize int, global bool) error {
37+
return nil
38+
}

extension/amqp/listener.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package amqp
2+
3+
import (
4+
"context"
5+
"io"
6+
"time"
7+
8+
"github.com/hellofresh/goengine"
9+
"github.com/hellofresh/goengine/driver/sql"
10+
"github.com/mailru/easyjson"
11+
"github.com/streadway/amqp"
12+
)
13+
14+
// Ensure Listener implements sql.Listener
15+
var _ sql.Listener = &Listener{}
16+
17+
type (
18+
// Consume returns a channel of amqp.Delivery's and a related closer or an error
19+
Consume func() (io.Closer, <-chan amqp.Delivery, error)
20+
21+
// Listener consumes messages from an queue
22+
Listener struct {
23+
consume Consume
24+
minReconnectInterval time.Duration
25+
maxReconnectInterval time.Duration
26+
logger goengine.Logger
27+
}
28+
)
29+
30+
// NewListener returns a new Listener
31+
func NewListener(
32+
consume Consume,
33+
minReconnectInterval time.Duration,
34+
maxReconnectInterval time.Duration,
35+
logger goengine.Logger,
36+
) (*Listener, error) {
37+
switch {
38+
case consume == nil:
39+
return nil, goengine.InvalidArgumentError("consume")
40+
}
41+
42+
if logger == nil {
43+
logger = goengine.NopLogger
44+
}
45+
46+
return &Listener{
47+
consume: consume,
48+
minReconnectInterval: minReconnectInterval,
49+
maxReconnectInterval: maxReconnectInterval,
50+
logger: logger,
51+
}, nil
52+
}
53+
54+
// Listen receives messages from a queue, transforms them into a sql.ProjectionNotification and calls the trigger
55+
func (l *Listener) Listen(ctx context.Context, trigger sql.ProjectionTrigger) error {
56+
var nextReconnect time.Time
57+
reconnectInterval := l.minReconnectInterval
58+
for {
59+
select {
60+
case <-ctx.Done():
61+
return context.Canceled
62+
default:
63+
}
64+
65+
conn, deliveries, err := l.consume()
66+
if err != nil {
67+
l.logger.Error("failed to start consuming amqp messages", func(entry goengine.LoggerEntry) {
68+
entry.Error(err)
69+
entry.String("reconnect_in", reconnectInterval.String())
70+
})
71+
72+
time.Sleep(reconnectInterval)
73+
reconnectInterval *= 2
74+
if reconnectInterval > l.maxReconnectInterval {
75+
reconnectInterval = l.maxReconnectInterval
76+
}
77+
continue
78+
}
79+
reconnectInterval = l.minReconnectInterval
80+
nextReconnect = time.Now().Add(reconnectInterval)
81+
82+
l.consumeMessages(ctx, conn, deliveries, trigger)
83+
84+
select {
85+
case <-ctx.Done():
86+
return context.Canceled
87+
default:
88+
time.Sleep(time.Until(nextReconnect))
89+
}
90+
}
91+
}
92+
93+
func (l *Listener) consumeMessages(ctx context.Context, conn io.Closer, deliveries <-chan amqp.Delivery, trigger sql.ProjectionTrigger) {
94+
defer func() {
95+
if conn == nil {
96+
return
97+
}
98+
99+
if err := conn.Close(); err != nil {
100+
l.logger.Error("failed to close amqp connection", func(entry goengine.LoggerEntry) {
101+
entry.Error(err)
102+
})
103+
}
104+
}()
105+
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
return
110+
case msg, ok := <-deliveries:
111+
if !ok {
112+
return
113+
}
114+
115+
notification := &sql.ProjectionNotification{}
116+
if err := easyjson.Unmarshal(msg.Body, notification); err != nil {
117+
l.logger.Error("failed to unmarshal delivery, dropping message", func(entry goengine.LoggerEntry) {
118+
entry.Error(err)
119+
})
120+
continue
121+
}
122+
123+
if err := trigger(ctx, notification); err != nil {
124+
l.logger.Error("failed to project notification", func(entry goengine.LoggerEntry) {
125+
entry.Error(err)
126+
entry.Int64("notification.no", notification.No)
127+
entry.String("notification.aggregate_id", notification.AggregateID)
128+
})
129+
}
130+
}
131+
}
132+
}

extension/amqp/listener_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// +build unit
2+
3+
package amqp_test
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"io"
9+
"testing"
10+
"time"
11+
12+
"github.com/hellofresh/goengine"
13+
"github.com/hellofresh/goengine/driver/sql"
14+
"github.com/hellofresh/goengine/extension/amqp"
15+
goengineLogger "github.com/hellofresh/goengine/extension/logrus"
16+
"github.com/sirupsen/logrus"
17+
"github.com/sirupsen/logrus/hooks/test"
18+
libamqp "github.com/streadway/amqp"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
func TestListener_Listen(t *testing.T) {
24+
t.Run("Listen, consume and stop", func(t *testing.T) {
25+
ensure := require.New(t)
26+
27+
ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second)
28+
defer ctxCancel()
29+
30+
consumeCalls := 0
31+
consume := func() (io.Closer, <-chan libamqp.Delivery, error) {
32+
consumeCalls++
33+
ch := make(chan libamqp.Delivery, 2)
34+
ch <- libamqp.Delivery{
35+
Body: []byte(`{"no": 1, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
36+
}
37+
ch <- libamqp.Delivery{
38+
Body: []byte(`{"no": 2, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
39+
}
40+
return nil, ch, nil
41+
}
42+
triggerCalls := 0
43+
trigger := func(ctx context.Context, notification *sql.ProjectionNotification) error {
44+
triggerCalls++
45+
switch triggerCalls {
46+
case 1:
47+
ensure.Equal(&sql.ProjectionNotification{No: 1, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"}, notification)
48+
case 2:
49+
ensure.Equal(&sql.ProjectionNotification{No: 2, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"}, notification)
50+
ctxCancel()
51+
default:
52+
ensure.Fail("Only 2 calls to trigger where expected")
53+
}
54+
return nil
55+
}
56+
logger, loggerHook := getLogger()
57+
58+
listener, err := amqp.NewListener(consume, time.Millisecond, time.Millisecond, logger)
59+
ensure.NoError(err)
60+
61+
err = listener.Listen(ctx, trigger)
62+
63+
ensure.Equal(context.Canceled, err)
64+
ensure.Equal(1, consumeCalls)
65+
ensure.Equal(2, triggerCalls)
66+
ensure.Len(loggerHook.Entries, 0)
67+
})
68+
69+
t.Run("Reconnect with exponential back-off", func(t *testing.T) {
70+
ensure := require.New(t)
71+
72+
ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second)
73+
defer ctxCancel()
74+
75+
var consumeCalls []time.Time
76+
consume := func() (io.Closer, <-chan libamqp.Delivery, error) {
77+
consumeCalls = append(consumeCalls, time.Now())
78+
if len(consumeCalls) == 5 {
79+
ctxCancel()
80+
}
81+
82+
return nil, nil, fmt.Errorf("failure %d", len(consumeCalls))
83+
}
84+
85+
logger, loggerHook := getLogger()
86+
87+
listener, err := amqp.NewListener(consume, time.Millisecond, 6*time.Millisecond, logger)
88+
ensure.NoError(err)
89+
90+
err = listener.Listen(ctx, func(ctx context.Context, notification *sql.ProjectionNotification) error {
91+
ensure.Fail("Trigger should ever be called")
92+
return nil
93+
})
94+
95+
ensure.Equal(context.Canceled, err)
96+
97+
reconnectIntervals := []time.Duration{time.Millisecond, time.Millisecond * 2, time.Millisecond * 4, time.Millisecond * 6, time.Millisecond * 6}
98+
ensure.Len(consumeCalls, len(reconnectIntervals))
99+
for i := 1; i < len(reconnectIntervals); i++ {
100+
expectedInterval := reconnectIntervals[i-1]
101+
interval := consumeCalls[i].Sub(consumeCalls[i-1])
102+
103+
if expectedInterval > interval || interval > (expectedInterval+time.Millisecond) {
104+
assert.Fail(t, fmt.Sprintf("Invalid interval after consume %d (got %s expected between %s and %s)", i, interval, expectedInterval, (expectedInterval+time.Millisecond)))
105+
}
106+
}
107+
108+
// Ensure we get log output
109+
logEntries := loggerHook.AllEntries()
110+
ensure.Len(logEntries, len(reconnectIntervals))
111+
for i, log := range logEntries {
112+
assert.Equal(t, log.Level, logrus.ErrorLevel)
113+
assert.Equal(t, log.Message, "failed to start consuming amqp messages")
114+
assert.Equal(t, fmt.Errorf("failure %d", i+1), log.Data["error"])
115+
assert.Equal(t, reconnectIntervals[i].String(), log.Data["reconnect_in"])
116+
}
117+
})
118+
119+
t.Run("Listen, consume and reconnect", func(t *testing.T) {
120+
ensure := require.New(t)
121+
122+
ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second)
123+
defer ctxCancel()
124+
125+
consumeCalls := 0
126+
consume := func() (io.Closer, <-chan libamqp.Delivery, error) {
127+
consumeCalls++
128+
ch := make(chan libamqp.Delivery, 2)
129+
ch <- libamqp.Delivery{
130+
Body: []byte(`{"no": 1, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
131+
}
132+
ch <- libamqp.Delivery{
133+
Body: []byte(`{"no": 2, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
134+
}
135+
close(ch)
136+
return nil, ch, nil
137+
}
138+
triggerCalls := 0
139+
trigger := func(ctx context.Context, notification *sql.ProjectionNotification) error {
140+
triggerCalls++
141+
switch triggerCalls {
142+
case 1, 3:
143+
ensure.Equal(&sql.ProjectionNotification{No: 1, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"}, notification)
144+
case 2, 4:
145+
ensure.Equal(&sql.ProjectionNotification{No: 2, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"}, notification)
146+
default:
147+
ensure.Fail("Only 2 calls to trigger where expected")
148+
}
149+
if triggerCalls == 4 {
150+
ctxCancel()
151+
}
152+
return nil
153+
}
154+
155+
logger, loggerHook := getLogger()
156+
157+
listener, err := amqp.NewListener(consume, time.Millisecond, time.Millisecond, logger)
158+
ensure.NoError(err)
159+
160+
err = listener.Listen(ctx, trigger)
161+
162+
ensure.Equal(context.Canceled, err)
163+
ensure.Equal(2, consumeCalls)
164+
ensure.Equal(4, triggerCalls)
165+
ensure.Len(loggerHook.Entries, 0)
166+
})
167+
}
168+
169+
func getLogger() (goengine.Logger, *test.Hook) {
170+
logger, loggerHook := test.NewNullLogger()
171+
logger.SetLevel(logrus.DebugLevel)
172+
173+
return goengineLogger.Wrap(logger), loggerHook
174+
}

0 commit comments

Comments
 (0)