Skip to content

Commit 4f5677a

Browse files
committed
add consumer recovery
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 1d321a7 commit 4f5677a

File tree

8 files changed

+224
-79
lines changed

8 files changed

+224
-79
lines changed

docs/examples/getting_started/main.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,16 @@ func main() {
8686
deliveryContext, err := consumer.Receive(ctx)
8787
if errors.Is(err, context.Canceled) {
8888
// The consumer was closed correctly
89-
rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err)
89+
rabbitmq_amqp.Info("[NewConsumer]", "consumer closed. Context", err)
9090
return
9191
}
9292
if err != nil {
9393
// An error occurred receiving the message
94-
rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
94+
rabbitmq_amqp.Error("[NewConsumer]", "Error receiving message", err)
9595
return
9696
}
9797

98-
rabbitmq_amqp.Info("[Consumer]", "Received message",
98+
rabbitmq_amqp.Info("[NewConsumer]", "Received message",
9999
fmt.Sprintf("%s", deliveryContext.Message().Data))
100100

101101
err = deliveryContext.Accept(context.Background())
@@ -115,7 +115,7 @@ func main() {
115115
return
116116
}
117117

118-
for i := 0; i < 10_000; i++ {
118+
for i := 0; i < 1_000; i++ {
119119
// Publish a message to the exchange
120120
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
121121
if err != nil {
@@ -125,16 +125,16 @@ func main() {
125125
}
126126
switch publishResult.Outcome.(type) {
127127
case *amqp.StateAccepted:
128-
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
128+
rabbitmq_amqp.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
129129
break
130130
case *amqp.StateReleased:
131-
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
131+
rabbitmq_amqp.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
132132
break
133133
case *amqp.StateRejected:
134-
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
134+
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
135135
stateType := publishResult.Outcome.(*amqp.StateRejected)
136136
if stateType.Error != nil {
137-
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
137+
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
138138
}
139139
break
140140
default:
@@ -153,13 +153,13 @@ func main() {
153153
//Close the consumer
154154
err = consumer.Close(context.Background())
155155
if err != nil {
156-
rabbitmq_amqp.Error("[Consumer]", err)
156+
rabbitmq_amqp.Error("[NewConsumer]", err)
157157
return
158158
}
159159
// Close the publisher
160160
err = publisher.Close(context.Background())
161161
if err != nil {
162-
rabbitmq_amqp.Error("[Publisher]", err)
162+
rabbitmq_amqp.Error("[NewPublisher]", err)
163163
return
164164
}
165165

docs/examples/reliable/reliable.go

Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"github.com/Azure/go-amqp"
78
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
@@ -14,6 +15,7 @@ func main() {
1415
var stateAccepted int32
1516
var stateReleased int32
1617
var stateRejected int32
18+
1719
var received int32
1820
var failed int32
1921

@@ -40,6 +42,12 @@ func main() {
4042
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, &rabbitmq_amqp.AmqpConnOptions{
4143
SASLType: amqp.SASLTypeAnonymous(),
4244
ContainerID: "reliable-amqp10-go",
45+
RecoveryConfiguration: &rabbitmq_amqp.RecoveryConfiguration{
46+
ActiveRecovery: true,
47+
BackOffReconnectInterval: 2 * time.Second, // we reduce the reconnect interval to speed up the test. The default is 5 seconds
48+
// In production, you should avoid BackOffReconnectInterval with low values since it can cause a high number of reconnection attempts
49+
MaxReconnectAttempts: 5,
50+
},
4351
})
4452
if err != nil {
4553
rabbitmq_amqp.Error("Error opening connection", err)
@@ -62,40 +70,45 @@ func main() {
6270
return
6371
}
6472

65-
//consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
66-
// Queue: queueName,
67-
//}, "reliable-consumer")
68-
//if err != nil {
69-
// rabbitmq_amqp.Error("Error creating consumer", err)
70-
// return
71-
//}
72-
73-
//consumerContext, cancel := context.WithCancel(context.Background())
74-
75-
//// Consume messages from the queue
76-
//go func(ctx context.Context) {
77-
// for {
78-
// deliveryContext, err := consumer.Receive(ctx)
79-
// if errors.Is(err, context.Canceled) {
80-
// // The consumer was closed correctly
81-
// return
82-
// }
83-
// if err != nil {
84-
// // An error occurred receiving the message
85-
// rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
86-
// return
87-
// }
88-
//
89-
// rabbitmq_amqp.Info("[Consumer]", "Received message",
90-
// fmt.Sprintf("%s", deliveryContext.Message().Data))
91-
//
92-
// err = deliveryContext.Accept(context.Background())
93-
// if err != nil {
94-
// rabbitmq_amqp.Error("Error accepting message", err)
95-
// return
96-
// }
97-
// }
98-
//}(consumerContext)
73+
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
74+
Queue: queueName,
75+
}, "reliable-consumer")
76+
if err != nil {
77+
rabbitmq_amqp.Error("Error creating consumer", err)
78+
return
79+
}
80+
81+
consumerContext, cancel := context.WithCancel(context.Background())
82+
83+
// Consume messages from the queue
84+
go func(ctx context.Context) {
85+
for {
86+
deliveryContext, err := consumer.Receive(ctx)
87+
if errors.Is(err, context.Canceled) {
88+
// The consumer was closed correctly
89+
return
90+
}
91+
if err != nil {
92+
// An error occurred receiving the message
93+
rabbitmq_amqp.Error("[NewConsumer]", "Error receiving message, retry in 2_500 ms", err, "queue", queueName)
94+
// here the consumer could be disconnected from the server due to a network error
95+
// in this specific case, we just wait for 2_500 ms and try again (2 seconds is the reconnect interval we defined + random 500 random ms)
96+
// while the connection is reestablished
97+
// you can use the stateChanged channel to be notified when the connection is reestablished
98+
time.Sleep(2500 * time.Millisecond)
99+
continue
100+
}
101+
102+
atomic.AddInt32(&received, 1)
103+
err = deliveryContext.Accept(context.Background())
104+
if err != nil {
105+
// same here the delivery could not be accepted due to a network error
106+
// we wait for 2_500 ms and try again
107+
time.Sleep(2500 * time.Millisecond)
108+
continue
109+
}
110+
}
111+
}(consumerContext)
99112

100113
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.QueueAddress{
101114
Queue: queueName,
@@ -106,12 +119,16 @@ func main() {
106119
}
107120

108121
for i := 0; i < 1_000_000; i++ {
109-
// Publish a message to the exchange
110122
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
111123
if err != nil {
112124
rabbitmq_amqp.Error("Error publishing message", "error", err)
125+
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
126+
// then retry to send the message as soon as the connection is reestablished
127+
// in this specific case, we just wait for 2_500 ms and try again (2 seconds is the reconnect interval we defined + random 500 random ms)
128+
// you can use the stateChanged channel to be notified when the connection is reestablished
129+
// and use some signal to reactivate the message sending
113130
atomic.AddInt32(&failed, 1)
114-
time.Sleep(1 * time.Second)
131+
time.Sleep(2500 * time.Millisecond)
115132
continue
116133
}
117134
switch publishResult.Outcome.(type) {
@@ -136,17 +153,17 @@ func main() {
136153
var input string
137154
_, _ = fmt.Scanln(&input)
138155

139-
//cancel()
156+
cancel()
140157
//Close the consumer
141-
//err = consumer.Close(context.Background())
142-
//if err != nil {
143-
// rabbitmq_amqp.Error("[Consumer]", err)
144-
// return
145-
//}
158+
err = consumer.Close(context.Background())
159+
if err != nil {
160+
rabbitmq_amqp.Error("[NewConsumer]", err)
161+
return
162+
}
146163
// Close the publisher
147164
err = publisher.Close(context.Background())
148165
if err != nil {
149-
rabbitmq_amqp.Error("[Publisher]", err)
166+
rabbitmq_amqp.Error("[NewPublisher]", err)
150167
return
151168
}
152169

pkg/rabbitmq_amqp/amqp_connection.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"github.com/Azure/go-amqp"
88
"github.com/google/uuid"
9+
"math/rand"
910
"sync"
1011
"sync/atomic"
1112
"time"
@@ -93,14 +94,7 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, destination *QueueAddr
9394
}
9495
err = validateAddress(destinationAdd)
9596

96-
if err != nil {
97-
return nil, err
98-
}
99-
receiver, err := a.session.NewReceiver(ctx, destinationAdd, createReceiverLinkOptions(destinationAdd, linkName, AtLeastOnce))
100-
if err != nil {
101-
return nil, err
102-
}
103-
return newConsumer(receiver), nil
97+
return newConsumer(ctx, a, destinationAdd, linkName)
10498
}
10599

106100
// Dial connect to the AMQP 1.0 server using the provided connectionSettings
@@ -121,6 +115,16 @@ func Dial(ctx context.Context, addresses []string, connOptions *AmqpConnOptions,
121115
connOptions.RecoveryConfiguration = NewRecoveryConfiguration()
122116
}
123117

118+
// validate the RecoveryConfiguration options
119+
if connOptions.RecoveryConfiguration.MaxReconnectAttempts <= 0 && connOptions.RecoveryConfiguration.ActiveRecovery {
120+
return nil, fmt.Errorf("MaxReconnectAttempts should be greater than 0")
121+
}
122+
if connOptions.RecoveryConfiguration.BackOffReconnectInterval <= 1*time.Second && connOptions.RecoveryConfiguration.ActiveRecovery {
123+
return nil, fmt.Errorf("BackOffReconnectInterval should be greater than 1 second")
124+
}
125+
126+
// create the connection
127+
124128
conn := &AmqpConnection{
125129
management: NewAmqpManagement(),
126130
lifeCycle: NewLifeCycle(),
@@ -236,6 +240,10 @@ func (a *AmqpConnection) maybeReconnect() {
236240
reconnected := false
237241
for numberOfAttempts <= a.amqpConnOptions.RecoveryConfiguration.MaxReconnectAttempts {
238242
///wait for before reconnecting
243+
// add some random milliseconds to the wait time to avoid thundering herd
244+
// the random time is between 0 and 500 milliseconds
245+
waitTime = waitTime + time.Duration(rand.Intn(500))*time.Millisecond
246+
239247
Info("Waiting before reconnecting", "in", waitTime, "attempt", numberOfAttempts)
240248
time.Sleep(waitTime)
241249
// context with timeout
@@ -270,6 +278,21 @@ func (a *AmqpConnection) maybeReconnect() {
270278
return true
271279
})
272280
Info("Restarted publishers", "number of fails", fails)
281+
fails = 0
282+
a.entitiesTracker.consumers.Range(func(key, value any) bool {
283+
consumer := value.(*Consumer)
284+
// try to createSender
285+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
286+
err := consumer.createReceiver(ctx)
287+
if err != nil {
288+
atomic.AddInt32(&fails, 1)
289+
Error("Failed to createReceiver consumer", "ID", consumer.Id(), "error", err)
290+
}
291+
cancel()
292+
return true
293+
})
294+
Info("Restarted consumers", "number of fails", fails)
295+
273296
a.lifeCycle.SetState(&StateOpen{})
274297
}
275298

pkg/rabbitmq_amqp/amqp_connection_recovery.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ type RecoveryConfiguration struct {
1616
BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
1717
time will be increased exponentially with each attempt.
1818
Default is 5 seconds, each attempt will double the time.
19+
The minimum value is 1 second. Avoid setting a value low values since it can cause a high
20+
number of reconnection attempts.
1921
*/
2022
BackOffReconnectInterval time.Duration
2123

2224
/*
2325
MaxReconnectAttempts The maximum number of reconnection attempts.
2426
Default is 5.
27+
The minimum value is 1.
2528
*/
2629
MaxReconnectAttempts int
2730
}

0 commit comments

Comments
 (0)