Skip to content

Commit d425ec4

Browse files
committed
Explicitly acknowledge message delivery
1 parent 08825aa commit d425ec4

File tree

3 files changed

+49
-13
lines changed

3 files changed

+49
-13
lines changed

extension/amqp/amqp_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,21 @@ import (
1111
"github.com/stretchr/testify/assert"
1212
)
1313

14+
type mockAcknowledger struct {
15+
}
16+
17+
func (ack mockAcknowledger) Ack(tag uint64, multiple bool) error {
18+
return nil
19+
}
20+
21+
func (ack mockAcknowledger) Nack(tag uint64, multiple bool, requeue bool) error {
22+
return nil
23+
}
24+
25+
func (ack mockAcknowledger) Reject(tag uint64, requeue bool) error {
26+
return nil
27+
}
28+
1429
type mockConnection struct {
1530
}
1631

extension/amqp/listener.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,23 @@ func (l *Listener) consumeMessages(ctx context.Context, conn io.Closer, deliveri
120120
continue
121121
}
122122

123+
if err := msg.Ack(false); err != nil {
124+
l.logger.Error("failed to acknowledge notification delivery", func(entry goengine.LoggerEntry) {
125+
entry.Error(err)
126+
entry.Int64("notification.no", notification.No)
127+
entry.String("notification.aggregate_id", notification.AggregateID)
128+
})
129+
continue
130+
}
131+
123132
if err := trigger(ctx, notification); err != nil {
124133
l.logger.Error("failed to project notification", func(entry goengine.LoggerEntry) {
125134
entry.Error(err)
126135
entry.Int64("notification.no", notification.No)
127136
entry.String("notification.aggregate_id", notification.AggregateID)
128137
})
129138
}
139+
130140
}
131141
}
132142
}

extension/amqp/listener_test.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,22 @@ func TestListener_Listen(t *testing.T) {
2727
ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second)
2828
defer ctxCancel()
2929

30+
delivery1 := libamqp.Delivery{
31+
Body: []byte(`{"no": 1, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
32+
}
33+
delivery1.Acknowledger = mockAcknowledger{}
34+
35+
delivery2 := libamqp.Delivery{
36+
Body: []byte(`{"no": 2, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
37+
}
38+
delivery2.Acknowledger = mockAcknowledger{}
39+
3040
consumeCalls := 0
3141
consume := func() (io.Closer, <-chan libamqp.Delivery, error) {
3242
consumeCalls++
3343
ch := make(chan libamqp.Delivery, 2)
34-
ch <- libamqp.Delivery{
35-
Body: []byte(`{"no": 1, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
36-
}
37-
ch <- libamqp.Delivery{
38-
Body: []byte(`{"no": 2, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
39-
}
44+
ch <- delivery1
45+
ch <- delivery2
4046
return nil, ch, nil
4147
}
4248
triggerCalls := 0
@@ -100,7 +106,7 @@ func TestListener_Listen(t *testing.T) {
100106
expectedInterval := reconnectIntervals[i-1]
101107
interval := consumeCalls[i].Sub(consumeCalls[i-1])
102108

103-
if expectedInterval > interval || interval > (expectedInterval+time.Millisecond) {
109+
if expectedInterval > interval || interval > (expectedInterval+time.Millisecond*2) {
104110
assert.Fail(t, fmt.Sprintf("Invalid interval after consume %d (got %s expected between %s and %s)", i, interval, expectedInterval, (expectedInterval+time.Millisecond)))
105111
}
106112
}
@@ -123,15 +129,20 @@ func TestListener_Listen(t *testing.T) {
123129
defer ctxCancel()
124130

125131
consumeCalls := 0
132+
delivery1 := libamqp.Delivery{
133+
Body: []byte(`{"no": 1, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
134+
}
135+
delivery1.Acknowledger = mockAcknowledger{}
136+
137+
delivery2 := libamqp.Delivery{
138+
Body: []byte(`{"no": 2, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
139+
}
140+
delivery2.Acknowledger = mockAcknowledger{}
126141
consume := func() (io.Closer, <-chan libamqp.Delivery, error) {
127142
consumeCalls++
128143
ch := make(chan libamqp.Delivery, 2)
129-
ch <- libamqp.Delivery{
130-
Body: []byte(`{"no": 1, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
131-
}
132-
ch <- libamqp.Delivery{
133-
Body: []byte(`{"no": 2, "aggregate_id": "8150276e-34fe-49d9-aeae-a35af0040a4f"}`),
134-
}
144+
ch <- delivery1
145+
ch <- delivery2
135146
close(ch)
136147
return nil, ch, nil
137148
}

0 commit comments

Comments
 (0)