Skip to content

Commit 4896794

Browse files
authored
Merge pull request #51 from hellofresh/amqp-listen-publish
Add Amqp listener
2 parents b1bfa9d + d425ec4 commit 4896794

File tree

6 files changed

+421
-14
lines changed

6 files changed

+421
-14
lines changed

extension/amqp/amqp.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,20 @@ package amqp
33
import (
44
"io"
55

6+
"github.com/hellofresh/goengine"
67
"github.com/streadway/amqp"
78
)
89

10+
// NotificationChannel represents a channel for notifications
11+
type NotificationChannel interface {
12+
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
13+
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
14+
Qos(prefetchCount, prefetchSize int, global bool) error
15+
}
16+
917
// setup returns a connection and channel to be used for the Queue setup
1018
func setup(url, queue string) (io.Closer, NotificationChannel, error) {
19+
1120
conn, err := amqp.Dial(url)
1221
if err != nil {
1322
return nil, nil, err
@@ -25,7 +34,29 @@ func setup(url, queue string) (io.Closer, NotificationChannel, error) {
2534
return conn, ch, nil
2635
}
2736

28-
// NotificationChannel represents a channel for notifications
29-
type NotificationChannel interface {
30-
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
37+
// DirectQueueConsume returns a Consume func that will connect to the provided AMQP server and create a queue for direct message delivery
38+
func DirectQueueConsume(amqpDSN, queue string) (Consume, error) {
39+
if _, err := amqp.ParseURI(amqpDSN); err != nil {
40+
return nil, goengine.InvalidArgumentError("amqpDSN")
41+
}
42+
if len(queue) == 0 {
43+
return nil, goengine.InvalidArgumentError("queue")
44+
}
45+
46+
return func() (io.Closer, <-chan amqp.Delivery, error) {
47+
conn, ch, err := setup(amqpDSN, queue)
48+
if err != nil {
49+
return nil, nil, err
50+
}
51+
52+
// Indicate we only want 1 message to be acknowledge at a time.
53+
if err := ch.Qos(1, 0, false); err != nil {
54+
return nil, nil, err
55+
}
56+
57+
// Since there can be multiple consumers, fair distribution of deliveries is required
58+
deliveries, err := ch.Consume(queue, "", false, false, false, false, nil)
59+
60+
return conn, deliveries, err
61+
}, nil
3162
}

extension/amqp/amqp_test.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,30 @@
1+
// +build unit
2+
13
package amqp_test
24

3-
import "github.com/streadway/amqp"
5+
import (
6+
"testing"
7+
8+
"github.com/hellofresh/goengine"
9+
goengineAmqp "github.com/hellofresh/goengine/extension/amqp"
10+
"github.com/streadway/amqp"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
type mockAcknowledger struct {
15+
}
16+
17+
func (ack mockAcknowledger) Ack(tag uint64, multiple bool) error {
18+
return nil
19+
}
20+
21+
func (ack mockAcknowledger) Nack(tag uint64, multiple bool, requeue bool) error {
22+
return nil
23+
}
24+
25+
func (ack mockAcknowledger) Reject(tag uint64, requeue bool) error {
26+
return nil
27+
}
428

529
type mockConnection struct {
630
}
@@ -21,3 +45,35 @@ func (ch mockChannel) Publish(
2145
) error {
2246
return nil
2347
}
48+
49+
func (ch mockChannel) Consume(
50+
queue,
51+
consumer string,
52+
autoAck,
53+
exclusive,
54+
noLocal,
55+
noWait bool,
56+
args amqp.Table,
57+
) (<-chan amqp.Delivery, error) {
58+
return make(chan amqp.Delivery), nil
59+
}
60+
func (ch mockChannel) Qos(prefetchCount, prefetchSize int, global bool) error {
61+
return nil
62+
}
63+
64+
func TestDirectQueueConsume(t *testing.T) {
65+
t.Run("Invalid arguments", func(t *testing.T) {
66+
_, err := goengineAmqp.DirectQueueConsume("http://localhost:5672/", "my-queue")
67+
assert.Equal(t, goengine.InvalidArgumentError("amqpDSN"), err)
68+
69+
_, err = goengineAmqp.DirectQueueConsume("amqp://localhost:5672/", "")
70+
assert.Equal(t, goengine.InvalidArgumentError("queue"), err)
71+
})
72+
73+
t.Run("Returns amqp.Consume", func(t *testing.T) {
74+
c, err := goengineAmqp.DirectQueueConsume("amqp://localhost:5672/", "my-queue")
75+
assert.NoError(t, err)
76+
assert.NotNil(t, c)
77+
assert.IsType(t, (goengineAmqp.Consume)(nil), c)
78+
})
79+
}

extension/amqp/listener.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package amqp
2+
3+
import (
4+
"context"
5+
"io"
6+
"time"
7+
8+
"github.com/hellofresh/goengine"
9+
"github.com/hellofresh/goengine/driver/sql"
10+
"github.com/mailru/easyjson"
11+
"github.com/streadway/amqp"
12+
)
13+
14+
// Ensure Listener implements sql.Listener
15+
var _ sql.Listener = &Listener{}
16+
17+
type (
18+
// Consume returns a channel of amqp.Delivery's and a related closer or an error
19+
Consume func() (io.Closer, <-chan amqp.Delivery, error)
20+
21+
// Listener consumes messages from an queue
22+
Listener struct {
23+
consume Consume
24+
minReconnectInterval time.Duration
25+
maxReconnectInterval time.Duration
26+
logger goengine.Logger
27+
}
28+
)
29+
30+
// NewListener returns a new Listener
31+
func NewListener(
32+
consume Consume,
33+
minReconnectInterval time.Duration,
34+
maxReconnectInterval time.Duration,
35+
logger goengine.Logger,
36+
) (*Listener, error) {
37+
switch {
38+
case consume == nil:
39+
return nil, goengine.InvalidArgumentError("consume")
40+
}
41+
42+
if logger == nil {
43+
logger = goengine.NopLogger
44+
}
45+
46+
return &Listener{
47+
consume: consume,
48+
minReconnectInterval: minReconnectInterval,
49+
maxReconnectInterval: maxReconnectInterval,
50+
logger: logger,
51+
}, nil
52+
}
53+
54+
// Listen receives messages from a queue, transforms them into a sql.ProjectionNotification and calls the trigger
55+
func (l *Listener) Listen(ctx context.Context, trigger sql.ProjectionTrigger) error {
56+
var nextReconnect time.Time
57+
reconnectInterval := l.minReconnectInterval
58+
for {
59+
select {
60+
case <-ctx.Done():
61+
return context.Canceled
62+
default:
63+
}
64+
65+
conn, deliveries, err := l.consume()
66+
if err != nil {
67+
l.logger.Error("failed to start consuming amqp messages", func(entry goengine.LoggerEntry) {
68+
entry.Error(err)
69+
entry.String("reconnect_in", reconnectInterval.String())
70+
})
71+
72+
time.Sleep(reconnectInterval)
73+
reconnectInterval *= 2
74+
if reconnectInterval > l.maxReconnectInterval {
75+
reconnectInterval = l.maxReconnectInterval
76+
}
77+
continue
78+
}
79+
reconnectInterval = l.minReconnectInterval
80+
nextReconnect = time.Now().Add(reconnectInterval)
81+
82+
l.consumeMessages(ctx, conn, deliveries, trigger)
83+
84+
select {
85+
case <-ctx.Done():
86+
return context.Canceled
87+
default:
88+
time.Sleep(time.Until(nextReconnect))
89+
}
90+
}
91+
}
92+
93+
func (l *Listener) consumeMessages(ctx context.Context, conn io.Closer, deliveries <-chan amqp.Delivery, trigger sql.ProjectionTrigger) {
94+
defer func() {
95+
if conn == nil {
96+
return
97+
}
98+
99+
if err := conn.Close(); err != nil {
100+
l.logger.Error("failed to close amqp connection", func(entry goengine.LoggerEntry) {
101+
entry.Error(err)
102+
})
103+
}
104+
}()
105+
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
return
110+
case msg, ok := <-deliveries:
111+
if !ok {
112+
return
113+
}
114+
115+
notification := &sql.ProjectionNotification{}
116+
if err := easyjson.Unmarshal(msg.Body, notification); err != nil {
117+
l.logger.Error("failed to unmarshal delivery, dropping message", func(entry goengine.LoggerEntry) {
118+
entry.Error(err)
119+
})
120+
continue
121+
}
122+
123+
if err := msg.Ack(false); err != nil {
124+
l.logger.Error("failed to acknowledge notification delivery", func(entry goengine.LoggerEntry) {
125+
entry.Error(err)
126+
entry.Int64("notification.no", notification.No)
127+
entry.String("notification.aggregate_id", notification.AggregateID)
128+
})
129+
continue
130+
}
131+
132+
if err := trigger(ctx, notification); err != nil {
133+
l.logger.Error("failed to project notification", func(entry goengine.LoggerEntry) {
134+
entry.Error(err)
135+
entry.Int64("notification.no", notification.No)
136+
entry.String("notification.aggregate_id", notification.AggregateID)
137+
})
138+
}
139+
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)