Skip to content

Commit 707fe72

Browse files
authored
Auto recovery connection publishers and consumers (#22)
* Closes: #4 *. Closes: #5 * Add auto-reconnection for connection, producers and consumers --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 89c4dd7 commit 707fe72

36 files changed

+1257
-247
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
all: format vet test
1+
all: test
22

33
format:
44
go fmt ./...
55

66
vet:
7-
go vet ./rabbitmq_amqp
7+
go vet ./pkg/rabbitmq_amqp
88

9-
test:
10-
cd rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \
9+
test: format vet
10+
cd ./pkg/rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \
1111
--randomize-all --randomize-suites \
1212
--cover --coverprofile=coverage.txt --covermode=atomic \
1313
--race

docs/examples/getting_started/main.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"github.com/Azure/go-amqp"
8-
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
8+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
99
"time"
1010
)
1111

@@ -20,7 +20,7 @@ func main() {
2020
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
2121
go func(ch chan *rabbitmq_amqp.StateChanged) {
2222
for statusChanged := range ch {
23-
rabbitmq_amqp.Info("[Connection]", "Status changed", statusChanged)
23+
rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged)
2424
}
2525
}(stateChanged)
2626

@@ -33,7 +33,7 @@ func main() {
3333
// Register the channel to receive status change notifications
3434
amqpConnection.NotifyStatusChange(stateChanged)
3535

36-
fmt.Printf("AMQP Connection opened.\n")
36+
fmt.Printf("AMQP connection opened.\n")
3737
// Create the management interface for the connection
3838
// so we can declare exchanges, queues, and bindings
3939
management := amqpConnection.Management()
@@ -86,16 +86,16 @@ func main() {
8686
deliveryContext, err := consumer.Receive(ctx)
8787
if errors.Is(err, context.Canceled) {
8888
// The consumer was closed correctly
89-
rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err)
89+
rabbitmq_amqp.Info("[NewConsumer]", "consumer closed. Context", err)
9090
return
9191
}
9292
if err != nil {
9393
// An error occurred receiving the message
94-
rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
94+
rabbitmq_amqp.Error("[NewConsumer]", "Error receiving message", err)
9595
return
9696
}
9797

98-
rabbitmq_amqp.Info("[Consumer]", "Received message",
98+
rabbitmq_amqp.Info("[NewConsumer]", "Received message",
9999
fmt.Sprintf("%s", deliveryContext.Message().Data))
100100

101101
err = deliveryContext.Accept(context.Background())
@@ -115,26 +115,26 @@ func main() {
115115
return
116116
}
117117

118-
for i := 0; i < 10; i++ {
119-
118+
for i := 0; i < 1_000; i++ {
120119
// Publish a message to the exchange
121120
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
122121
if err != nil {
123-
rabbitmq_amqp.Error("Error publishing message", err)
124-
return
122+
rabbitmq_amqp.Error("Error publishing message", "error", err)
123+
time.Sleep(1 * time.Second)
124+
continue
125125
}
126126
switch publishResult.Outcome.(type) {
127127
case *amqp.StateAccepted:
128-
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
128+
rabbitmq_amqp.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
129129
break
130130
case *amqp.StateReleased:
131-
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
131+
rabbitmq_amqp.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
132132
break
133133
case *amqp.StateRejected:
134-
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
134+
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
135135
stateType := publishResult.Outcome.(*amqp.StateRejected)
136136
if stateType.Error != nil {
137-
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
137+
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
138138
}
139139
break
140140
default:
@@ -153,13 +153,13 @@ func main() {
153153
//Close the consumer
154154
err = consumer.Close(context.Background())
155155
if err != nil {
156-
rabbitmq_amqp.Error("[Consumer]", err)
156+
rabbitmq_amqp.Error("[NewConsumer]", err)
157157
return
158158
}
159159
// Close the publisher
160160
err = publisher.Close(context.Background())
161161
if err != nil {
162-
rabbitmq_amqp.Error("[Publisher]", err)
162+
rabbitmq_amqp.Error("[NewPublisher]", err)
163163
return
164164
}
165165

@@ -197,7 +197,7 @@ func main() {
197197
return
198198
}
199199

200-
fmt.Printf("AMQP Connection closed.\n")
200+
fmt.Printf("AMQP connection closed.\n")
201201
// not necessary. It waits for the status change to be printed
202202
time.Sleep(100 * time.Millisecond)
203203
close(stateChanged)

docs/examples/reliable/reliable.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/Azure/go-amqp"
8+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
)
13+
14+
func main() {
15+
queueName := "reliable-amqp10-go-queue"
16+
var stateAccepted int32
17+
var stateReleased int32
18+
var stateRejected int32
19+
20+
var received int32
21+
var failed int32
22+
23+
startTime := time.Now()
24+
go func() {
25+
for {
26+
time.Sleep(5 * time.Second)
27+
total := stateAccepted + stateReleased + stateRejected
28+
messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
29+
rabbitmq_amqp.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)
30+
31+
}
32+
}()
33+
34+
rabbitmq_amqp.Info("How to deal with network disconnections")
35+
signalBlock := sync.Cond{L: &sync.Mutex{}}
36+
/// Create a channel to receive state change notifications
37+
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
38+
go func(ch chan *rabbitmq_amqp.StateChanged) {
39+
for statusChanged := range ch {
40+
rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged)
41+
switch statusChanged.To.(type) {
42+
case *rabbitmq_amqp.StateOpen:
43+
signalBlock.Broadcast()
44+
}
45+
}
46+
}(stateChanged)
47+
48+
// Open a connection to the AMQP 1.0 server
49+
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, &rabbitmq_amqp.AmqpConnOptions{
50+
SASLType: amqp.SASLTypeAnonymous(),
51+
ContainerID: "reliable-amqp10-go",
52+
RecoveryConfiguration: &rabbitmq_amqp.RecoveryConfiguration{
53+
ActiveRecovery: true,
54+
BackOffReconnectInterval: 2 * time.Second, // we reduce the reconnect interval to speed up the test. The default is 5 seconds
55+
// In production, you should avoid BackOffReconnectInterval with low values since it can cause a high number of reconnection attempts
56+
MaxReconnectAttempts: 5,
57+
},
58+
})
59+
if err != nil {
60+
rabbitmq_amqp.Error("Error opening connection", err)
61+
return
62+
}
63+
// Register the channel to receive status change notifications
64+
amqpConnection.NotifyStatusChange(stateChanged)
65+
66+
fmt.Printf("AMQP connection opened.\n")
67+
// Create the management interface for the connection
68+
// so we can declare exchanges, queues, and bindings
69+
management := amqpConnection.Management()
70+
71+
// Declare a Quorum queue
72+
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
73+
Name: queueName,
74+
})
75+
if err != nil {
76+
rabbitmq_amqp.Error("Error declaring queue", err)
77+
return
78+
}
79+
80+
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
81+
Queue: queueName,
82+
}, "reliable-consumer")
83+
if err != nil {
84+
rabbitmq_amqp.Error("Error creating consumer", err)
85+
return
86+
}
87+
88+
consumerContext, cancel := context.WithCancel(context.Background())
89+
90+
// Consume messages from the queue
91+
go func(ctx context.Context) {
92+
for {
93+
deliveryContext, err := consumer.Receive(ctx)
94+
if errors.Is(err, context.Canceled) {
95+
// The consumer was closed correctly
96+
return
97+
}
98+
if err != nil {
99+
// An error occurred receiving the message
100+
// here the consumer could be disconnected from the server due to a network error
101+
signalBlock.L.Lock()
102+
rabbitmq_amqp.Info("[Consumer]", "Consumer is blocked, queue", queueName, "error", err)
103+
signalBlock.Wait()
104+
rabbitmq_amqp.Info("[Consumer]", "Consumer is unblocked, queue", queueName)
105+
106+
signalBlock.L.Unlock()
107+
continue
108+
}
109+
110+
atomic.AddInt32(&received, 1)
111+
err = deliveryContext.Accept(context.Background())
112+
if err != nil {
113+
// same here the delivery could not be accepted due to a network error
114+
// we wait for 2_500 ms and try again
115+
time.Sleep(2500 * time.Millisecond)
116+
continue
117+
}
118+
}
119+
}(consumerContext)
120+
121+
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.QueueAddress{
122+
Queue: queueName,
123+
}, "reliable-publisher")
124+
if err != nil {
125+
rabbitmq_amqp.Error("Error creating publisher", err)
126+
return
127+
}
128+
129+
wg := &sync.WaitGroup{}
130+
for i := 0; i < 1; i++ {
131+
wg.Add(1)
132+
go func() {
133+
defer wg.Done()
134+
for i := 0; i < 500_000; i++ {
135+
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
136+
if err != nil {
137+
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
138+
// then retry to send the message as soon as the connection is reestablished
139+
140+
atomic.AddInt32(&failed, 1)
141+
// block signalBlock until the connection is reestablished
142+
signalBlock.L.Lock()
143+
rabbitmq_amqp.Info("[Publisher]", "Publisher is blocked, queue", queueName, "error", err)
144+
signalBlock.Wait()
145+
rabbitmq_amqp.Info("[Publisher]", "Publisher is unblocked, queue", queueName)
146+
signalBlock.L.Unlock()
147+
148+
} else {
149+
switch publishResult.Outcome.(type) {
150+
case *amqp.StateAccepted:
151+
atomic.AddInt32(&stateAccepted, 1)
152+
break
153+
case *amqp.StateReleased:
154+
atomic.AddInt32(&stateReleased, 1)
155+
break
156+
case *amqp.StateRejected:
157+
atomic.AddInt32(&stateRejected, 1)
158+
break
159+
default:
160+
// these status are not supported. Leave it for AMQP 1.0 compatibility
161+
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
162+
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
163+
}
164+
}
165+
}
166+
}()
167+
}
168+
wg.Wait()
169+
170+
println("press any key to close the connection")
171+
172+
var input string
173+
_, _ = fmt.Scanln(&input)
174+
175+
cancel()
176+
//Close the consumer
177+
err = consumer.Close(context.Background())
178+
if err != nil {
179+
rabbitmq_amqp.Error("[NewConsumer]", err)
180+
return
181+
}
182+
// Close the publisher
183+
err = publisher.Close(context.Background())
184+
if err != nil {
185+
rabbitmq_amqp.Error("[NewPublisher]", err)
186+
return
187+
}
188+
189+
// Purge the queue
190+
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
191+
if err != nil {
192+
fmt.Printf("Error purging queue: %v\n", err)
193+
return
194+
}
195+
fmt.Printf("Purged %d messages from the queue.\n", purged)
196+
197+
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
198+
if err != nil {
199+
fmt.Printf("Error deleting queue: %v\n", err)
200+
return
201+
}
202+
203+
err = amqpConnection.Close(context.Background())
204+
if err != nil {
205+
fmt.Printf("Error closing connection: %v\n", err)
206+
return
207+
}
208+
209+
fmt.Printf("AMQP connection closed.\n")
210+
// not necessary. It waits for the status change to be printed
211+
time.Sleep(100 * time.Millisecond)
212+
close(stateChanged)
213+
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)