Skip to content

Commit 7e59792

Browse files
authored
fix: retry logic with dlx feature on rabbitmq (#17)
1 parent c736a57 commit 7e59792

File tree

12 files changed

+285
-31
lines changed

12 files changed

+285
-31
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
# goqueue
22

3-
GoQueue - one library to rule them all. A golang wrapper that handles all the complexity of every Queue platforms. Extensible and easy to learn
3+
GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.
44

55
## Index
66

77
- [Support](#support)
88
- [Getting Started](#getting-started)
99
- [Example](#example)
10+
- [Advance Setups](#advance-setups)
1011
- [Contribution](#contribution)
1112

1213
## Support
@@ -58,12 +59,15 @@ func initExchange(ch *amqp.Channel, exchangeName string) error {
5859
}
5960

6061
func main() {
62+
63+
// Initialize the RMQ connection
6164
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
6265
rmqConn, err := amqp.Dial(rmqDSN)
6366
if err != nil {
6467
panic(err)
6568
}
6669

70+
// Initialize the Publisher
6771
rmqPub := publisher.NewPublisher(
6872
publisherOpts.PublisherPlatformRabbitMQ,
6973
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
@@ -151,6 +155,13 @@ func handler() interfaces.InboundMessageHandlerFunc {
151155

152156
```
153157

158+
## Advance Setups
159+
160+
### RabbitMQ -- Retry Concept
161+
162+
![Goqueue Retry Architecture RabbitMQ](misc/images/rabbitmq-retry.png)
163+
Src: [Excalidraw Link](https://link.excalidraw.com/readonly/9sphJpzXzQIAVov3z8G7)
164+
154165
## Contribution
155166

156167
---
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
version: "3.7"
2+
services:
3+
rabbitmq-test:
4+
image: rabbitmq:3.13.3-management-alpine
5+
container_name: "goqueue-rabbitmq-example-basic"
6+
hostname: rabbitmq
7+
ports:
8+
- "15671:15672"
9+
- "5672:5672"
10+
volumes:
11+
- ../../../tests/localconf/rabbitmq/rabbitmq.definition.json:/opt/definitions.json:ro
12+
- ../../../tests/localconf/rabbitmq/rabbitmq.config:/etc/rabbitmq/rabbitmq.config:ro
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
version: "3.7"
2+
services:
3+
rabbitmq-test:
4+
image: rabbitmq:3.13.3-management-alpine
5+
container_name: "goqueue-rabbitmq-example-with-retries"
6+
hostname: rabbitmq
7+
ports:
8+
- "15671:15672"
9+
- "5672:5672"
10+
volumes:
11+
- ../../../tests/localconf/rabbitmq/rabbitmq.definition.json:/opt/definitions.json:ro
12+
- ../../../tests/localconf/rabbitmq/rabbitmq.config:/etc/rabbitmq/rabbitmq.config:ro
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
amqp "github.com/rabbitmq/amqp091-go"
9+
10+
"github.com/bxcodec/goqueue"
11+
"github.com/bxcodec/goqueue/consumer"
12+
"github.com/bxcodec/goqueue/interfaces"
13+
"github.com/bxcodec/goqueue/middleware"
14+
"github.com/bxcodec/goqueue/options"
15+
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
16+
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
17+
"github.com/bxcodec/goqueue/publisher"
18+
)
19+
20+
func initExchange(ch *amqp.Channel, exchangeName string) error {
21+
return ch.ExchangeDeclare(
22+
exchangeName,
23+
"topic",
24+
true,
25+
false,
26+
false,
27+
false,
28+
nil,
29+
)
30+
}
31+
32+
func main() {
33+
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
34+
rmqConn, err := amqp.Dial(rmqDSN)
35+
if err != nil {
36+
panic(err)
37+
}
38+
39+
rmqPub := publisher.NewPublisher(
40+
publisherOpts.PublisherPlatformRabbitMQ,
41+
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
42+
Conn: rmqConn,
43+
PublisherChannelPoolSize: 5,
44+
}),
45+
publisherOpts.WithPublisherID("publisher_id"),
46+
publisherOpts.WithMiddlewares(
47+
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
48+
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
49+
),
50+
)
51+
52+
requeueChannel, err := rmqConn.Channel()
53+
if err != nil {
54+
panic(err)
55+
}
56+
57+
defer requeueChannel.Close()
58+
initExchange(requeueChannel, "goqueue")
59+
60+
consumerChannel, err := rmqConn.Channel()
61+
if err != nil {
62+
panic(err)
63+
}
64+
defer consumerChannel.Close()
65+
rmqConsumer := consumer.NewConsumer(
66+
consumerOpts.ConsumerPlatformRabbitMQ,
67+
consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
68+
consumerChannel,
69+
requeueChannel,
70+
"goqueue", // exchange name
71+
[]string{"goqueue.payments.#"}, // routing keys pattern
72+
)),
73+
consumerOpts.WithConsumerID("consumer_id"),
74+
consumerOpts.WithMiddlewares(
75+
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
76+
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
77+
),
78+
consumerOpts.WithMaxRetryFailedMessage(5),
79+
consumerOpts.WithBatchMessageSize(1),
80+
consumerOpts.WithQueueName("consumer_queue"),
81+
)
82+
83+
queueSvc := goqueue.NewQueueService(
84+
options.WithConsumer(rmqConsumer),
85+
options.WithPublisher(rmqPub),
86+
options.WithMessageHandler(handler()),
87+
)
88+
go func() {
89+
for i := 0; i < 10; i++ {
90+
data := map[string]interface{}{
91+
"message": fmt.Sprintf("Hello World %d", i),
92+
}
93+
jbyt, _ := json.Marshal(data)
94+
err := queueSvc.Publish(context.Background(), interfaces.Message{
95+
Data: data,
96+
Action: "goqueue.payments.create",
97+
Topic: "goqueue",
98+
})
99+
if err != nil {
100+
panic(err)
101+
}
102+
fmt.Println("Message Sent: ", string(jbyt))
103+
}
104+
}()
105+
106+
// change to context.Background() if you want to run it forever
107+
// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
108+
// defer cancel()
109+
err = queueSvc.Start(context.Background())
110+
if err != nil {
111+
panic(err)
112+
}
113+
}
114+
115+
func handler() interfaces.InboundMessageHandlerFunc {
116+
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
117+
fmt.Printf("Message: %+v\n", m)
118+
// something happend, we need to requeue the message
119+
return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
120+
}
121+
}

headers/key/const.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@ const (
99
ContentType = "goqueue-content-type"
1010
QueueServiceAgent = "goqueue-queue-service-agent"
1111
MessageID = "goqueue-message-id"
12+
OriginalTopicName = "goqueue-original-topic-name"
13+
OriginalActionName = "goqueue-original-action-name"
1214
)

interfaces/delayfn.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package interfaces
22

3-
type DelayFn func(retries int64) (delay int64)
3+
type DelayFn func(currenRetries int64) (delay int64)
44

55
var (
66
// ExponentialBackoffDelayFn is a delay function that implements exponential backoff.
7-
// It takes the number of retries as input and returns the delay in milliseconds.
8-
ExponentialBackoffDelayFn DelayFn = func(retries int64) (delay int64) {
9-
return 2 << retries
7+
// It takes the number of retries as input and returns the delay in seconds.
8+
ExponentialBackoffDelayFn DelayFn = func(currenRetries int64) (delay int64) {
9+
return 2 << (currenRetries - 1)
1010
}
11+
12+
// LinearDelayFn is a delay function that implements linear delay.
13+
// It takes the number of retries as input and returns the delay in seconds.
14+
LinearDelayFn DelayFn = func(currenRetries int64) (delay int64) {
15+
return currenRetries
16+
}
17+
1118
// NoDelayFn is a DelayFn implementation that returns 0 delay for retries.
12-
NoDelayFn DelayFn = func(retries int64) (delay int64) {
19+
NoDelayFn DelayFn = func(currenRetries int64) (delay int64) {
1320
return 0
1421
}
15-
DefaultDelayFn DelayFn = NoDelayFn
22+
DefaultDelayFn DelayFn = LinearDelayFn
1623
)

interfaces/inboundmessagehandler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ type InboundMessage struct {
2828
// eg RabbitMQ: https://www.rabbitmq.com/docs/dlx
2929
MoveToDeadLetterQueue func(ctx context.Context) (err error) `json:"-"`
3030
// Requeue is used to put the message back to the tail of the queue after a delay.
31-
PutToBackOfQueueWithDelay func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
31+
RetryWithDelayFn func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
3232
}

internal/consumer/rabbitmq/blackbox_consumer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func handlerRequeue(t *testing.T) interfaces.InboundMessageHandlerFunc {
273273
return m.RetryCount
274274
}
275275

276-
err = m.PutToBackOfQueueWithDelay(ctx, delayFn)
276+
err = m.RetryWithDelayFn(ctx, delayFn)
277277
assert.NoError(t, err)
278278
return
279279
}

0 commit comments

Comments
 (0)