Skip to content

Commit 7141a08

Browse files
authored
Merge pull request #108 from 0x4b53/update-ack-fn
Add `OnErrFunc` which is more generic in how ack failures are handled
2 parents a0f116d + 0fb00d3 commit 7141a08

File tree

3 files changed

+88
-15
lines changed

3 files changed

+88
-15
lines changed

middleware/ack.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,46 @@ package middleware
33
import (
44
"context"
55

6-
amqp "github.com/rabbitmq/amqp091-go"
7-
86
amqprpc "github.com/0x4b53/amqp-rpc/v3"
7+
amqp "github.com/rabbitmq/amqp091-go"
98
)
109

10+
// OnErrFunc is the function that will be called when the middleware get an
11+
// error from `Ack`. The error and the delivery will be passed.
12+
type OnErrFunc func(err error, delivery amqp.Delivery)
13+
14+
// OnAckErrorLog is a built-in function that will log the error if any is
15+
// returned from `Ack`.
16+
//
17+
// middleware := AckDelivery(OnAckErrorLog(log.Printf))
18+
func OnAckErrorLog(logFn amqprpc.LogFunc) OnErrFunc {
19+
return func(err error, delivery amqp.Delivery) {
20+
logFn("could not ack delivery (%s): %v\n", delivery.CorrelationId, err)
21+
}
22+
}
23+
24+
// OnAckErrorSendOnChannel will first log the error and correlation ID and then
25+
// try to send on the passed channel. If no one is consuming on the passed
26+
// channel the middleware will not block but instead log a message about missing
27+
// channel consumers.
28+
func OnAckErrorSendOnChannel(logFn amqprpc.LogFunc, ch chan struct{}) OnErrFunc {
29+
logErr := OnAckErrorLog(logFn)
30+
31+
return func(err error, delivery amqp.Delivery) {
32+
logErr(err, delivery)
33+
34+
select {
35+
case ch <- struct{}{}:
36+
default:
37+
logFn("ack middleware: could not send on channel, no one is consuming\n")
38+
}
39+
}
40+
}
41+
1142
// AckDelivery is a middleware that will acknowledge the delivery after the
12-
// handler has been executed. Any error returned from d.Ack will be passed
13-
// to the provided logFunc.
14-
func AckDelivery(logFunc amqprpc.LogFunc) amqprpc.ServerMiddlewareFunc {
43+
// handler has been executed. If the Ack fails the error and the `amqp.Delivery`
44+
// will be passed to the `OnErrFunc`.
45+
func AckDelivery(onErrFn OnErrFunc) amqprpc.ServerMiddlewareFunc {
1546
return func(next amqprpc.HandlerFunc) amqprpc.HandlerFunc {
1647
return func(ctx context.Context, rw *amqprpc.ResponseWriter, d amqp.Delivery) {
1748
acknowledger := amqprpc.NewAwareAcknowledger(d.Acknowledger)
@@ -23,9 +54,8 @@ func AckDelivery(logFunc amqprpc.LogFunc) amqprpc.ServerMiddlewareFunc {
2354
return
2455
}
2556

26-
err := d.Ack(false)
27-
if err != nil {
28-
logFunc("could not Ack delivery (%s): %v", d.CorrelationId, err)
57+
if err := d.Ack(false); err != nil {
58+
onErrFn(err, d)
2959
}
3060
}
3161
}

middleware/ack_test.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package middleware
22

33
import (
44
"context"
5+
"errors"
56
"log"
7+
"sync/atomic"
68
"testing"
9+
"time"
710

811
amqp "github.com/rabbitmq/amqp091-go"
912
"github.com/stretchr/testify/assert"
@@ -13,32 +16,66 @@ import (
1316

1417
func TestAckDelivery(t *testing.T) {
1518
tests := []struct {
16-
handler amqprpc.HandlerFunc
17-
name string
19+
handler amqprpc.HandlerFunc
20+
name string
21+
ackReturn error
22+
didSendOnChannel bool
1823
}{
1924
{
20-
name: "handler doesn't ack",
21-
handler: func(_ context.Context, _ *amqprpc.ResponseWriter, _ amqp.Delivery) {},
25+
name: "handler doesn't ack",
26+
handler: func(_ context.Context, _ *amqprpc.ResponseWriter, _ amqp.Delivery) {},
27+
ackReturn: nil,
2228
},
2329
{
2430
name: "handler does ack",
2531
handler: func(_ context.Context, _ *amqprpc.ResponseWriter, d amqp.Delivery) {
2632
_ = d.Ack(false)
2733
},
34+
ackReturn: nil,
35+
},
36+
{
37+
name: "handler fails to ack",
38+
handler: func(_ context.Context, _ *amqprpc.ResponseWriter, _ amqp.Delivery) {},
39+
ackReturn: errors.New("issue in the multiplexer"), //nolint:err113 // Just a test
40+
didSendOnChannel: true,
2841
},
2942
}
3043
for _, tt := range tests {
3144
t.Run(tt.name, func(t *testing.T) {
32-
acknowledger := &amqprpc.MockAcknowledger{}
45+
acknowledger := &amqprpc.MockAcknowledger{
46+
OnAckFn: func() error {
47+
return tt.ackReturn
48+
},
49+
}
50+
51+
didSendOnCh := atomic.Bool{}
52+
didSendOnCh.Store(false)
53+
54+
// We setup a channel to ensure we don't proceed until we started
55+
// the go routine that will listen to the signal.
56+
isListening := make(chan struct{})
57+
58+
ch := make(chan struct{})
59+
go func() {
60+
close(isListening)
61+
<-ch
62+
didSendOnCh.Store(true)
63+
}()
64+
65+
// Block until ready.
66+
<-isListening
3367

34-
handler := AckDelivery(log.Printf)(tt.handler)
68+
handler := AckDelivery(OnAckErrorSendOnChannel(log.Printf, ch))(tt.handler)
3569

3670
rw := amqprpc.NewResponseWriter(&amqp.Publishing{})
37-
d := amqp.Delivery{Acknowledger: acknowledger}
71+
d := amqp.Delivery{Acknowledger: acknowledger, CorrelationId: "id-1234"}
3872

3973
handler(context.Background(), rw, d)
4074

4175
assert.Equal(t, 1, acknowledger.Acks)
76+
assert.Eventually(t, func() bool {
77+
return didSendOnCh.Load() == tt.didSendOnChannel
78+
}, 2*time.Second, 100*time.Millisecond)
4279
})
4380
}
4481
}

testing.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,17 @@ type MockAcknowledger struct {
2525
Acks int
2626
Nacks int
2727
Rejects int
28+
OnAckFn func() error
2829
}
2930

3031
// Ack increases Acks.
3132
func (ma *MockAcknowledger) Ack(_ uint64, _ bool) error {
3233
ma.Acks++
34+
35+
if ma.OnAckFn != nil {
36+
return ma.OnAckFn()
37+
}
38+
3339
return nil
3440
}
3541

0 commit comments

Comments
 (0)