Skip to content

Commit 3c8ebb7

Browse files
authored
Merge pull request #67 from rabbitmq/37-implement-topology-recovery
Add Topology Recovery record
2 parents 4e73e82 + e18f1b6 commit 3c8ebb7

File tree

10 files changed

+1686
-40
lines changed

10 files changed

+1686
-40
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.2-rc-management-alpine
11-
10+
readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management-alpine}
1211

1312
readonly docker_name_prefix='rabbitmq-amqp-go-client'
1413
readonly docker_network_name="$docker_name_prefix-network"
1514

15+
declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
16+
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"
17+
1618
if [[ ! -v GITHUB_ACTIONS ]]
1719
then
1820
GITHUB_ACTIONS='false'
@@ -49,9 +51,6 @@ fi
4951

5052
set -o nounset
5153

52-
declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
53-
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"
54-
5554
function start_toxiproxy
5655
{
5756
if [[ $run_toxiproxy == 'true' ]]

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go.work.sum
2323

2424
# env file
2525
.env
26+
.envrc
2627
.idea/
2728
coverage.txt
2829
.DS_Store

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ func (o OAuth2Options) Clone() *OAuth2Options {
4545

4646
}
4747

48+
// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
49+
// See [TopologyRecoveryDisabled], [TopologyRecoveryOnlyTransient], and [TopologyRecoveryAllEnabled] for more information.
50+
type TopologyRecoveryOptions byte
51+
52+
const (
53+
// TopologyRecoveryOnlyTransient recovers only queues declared as exclusive and/or auto delete, and
54+
// related bindings. Exchanges are not recovered.
55+
TopologyRecoveryOnlyTransient TopologyRecoveryOptions = iota
56+
// TopologyRecoveryDisabled disables the topology recovery.
57+
TopologyRecoveryDisabled
58+
// TopologyRecoveryAllEnabled recovers all the topology. All exchanges, queues, and bindings are recovered.
59+
TopologyRecoveryAllEnabled
60+
)
61+
4862
type AmqpConnOptions struct {
4963
// wrapper for amqp.ConnOptions
5064
ContainerID string
@@ -76,6 +90,9 @@ type AmqpConnOptions struct {
7690
// when the connection is closed unexpectedly.
7791
RecoveryConfiguration *RecoveryConfiguration
7892

93+
// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
94+
TopologyRecoveryOptions TopologyRecoveryOptions
95+
7996
// The OAuth2Options is used to configure the connection with OAuth2 token.
8097
OAuth2Options *OAuth2Options
8198

@@ -91,15 +108,16 @@ func (a *AmqpConnOptions) isOAuth2() bool {
91108
func (a *AmqpConnOptions) Clone() *AmqpConnOptions {
92109

93110
cloned := &AmqpConnOptions{
94-
ContainerID: a.ContainerID,
95-
IdleTimeout: a.IdleTimeout,
96-
MaxFrameSize: a.MaxFrameSize,
97-
MaxSessions: a.MaxSessions,
98-
Properties: a.Properties,
99-
SASLType: a.SASLType,
100-
TLSConfig: a.TLSConfig,
101-
WriteTimeout: a.WriteTimeout,
102-
Id: a.Id,
111+
ContainerID: a.ContainerID,
112+
IdleTimeout: a.IdleTimeout,
113+
MaxFrameSize: a.MaxFrameSize,
114+
MaxSessions: a.MaxSessions,
115+
Properties: a.Properties,
116+
SASLType: a.SASLType,
117+
TLSConfig: a.TLSConfig,
118+
WriteTimeout: a.WriteTimeout,
119+
Id: a.Id,
120+
TopologyRecoveryOptions: a.TopologyRecoveryOptions,
103121
}
104122
if a.OAuth2Options != nil {
105123
cloned.OAuth2Options = a.OAuth2Options.Clone()
@@ -109,23 +127,23 @@ func (a *AmqpConnOptions) Clone() *AmqpConnOptions {
109127
}
110128

111129
return cloned
112-
113130
}
114131

115132
type AmqpConnection struct {
116133
properties map[string]any
117134
featuresAvailable *featuresAvailable
118135

119-
azureConnection *amqp.Conn
120-
management *AmqpManagement
121-
lifeCycle *LifeCycle
122-
amqpConnOptions *AmqpConnOptions
123-
address string
124-
session *amqp.Session
125-
refMap *sync.Map
126-
entitiesTracker *entitiesTracker
127-
mutex sync.RWMutex
128-
closed bool
136+
azureConnection *amqp.Conn
137+
management *AmqpManagement
138+
lifeCycle *LifeCycle
139+
amqpConnOptions *AmqpConnOptions
140+
address string
141+
session *amqp.Session
142+
refMap *sync.Map
143+
entitiesTracker *entitiesTracker
144+
topologyRecoveryRecords *topologyRecoveryRecords
145+
mutex sync.RWMutex
146+
closed bool
129147
}
130148

131149
func (a *AmqpConnection) Properties() map[string]any {
@@ -321,16 +339,19 @@ func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*A
321339
if err != nil {
322340
return nil, err
323341
}
324-
325342
// create the connection
326343
conn := &AmqpConnection{
327-
management: newAmqpManagement(),
328-
lifeCycle: NewLifeCycle(),
329-
amqpConnOptions: connOptions,
330-
entitiesTracker: newEntitiesTracker(),
331-
featuresAvailable: newFeaturesAvailable(),
344+
management: newAmqpManagement(connOptions.TopologyRecoveryOptions),
345+
lifeCycle: NewLifeCycle(),
346+
amqpConnOptions: connOptions,
347+
entitiesTracker: newEntitiesTracker(),
348+
topologyRecoveryRecords: newTopologyRecoveryRecords(),
349+
featuresAvailable: newFeaturesAvailable(),
332350
}
333351

352+
// management needs to access the connection to manage the recovery records
353+
conn.management.topologyRecoveryRecords = conn.topologyRecoveryRecords
354+
334355
err = conn.open(ctx, address, connOptions)
335356
if err != nil {
336357
return nil, err
@@ -496,6 +517,7 @@ func (a *AmqpConnection) maybeReconnect() {
496517
cancel()
497518

498519
if err == nil {
520+
a.recoverTopology()
499521
a.restartEntities()
500522
a.lifeCycle.SetState(&StateOpen{})
501523
return
@@ -551,6 +573,39 @@ func (a *AmqpConnection) restartEntities() {
551573
"consumerFails", consumerFails)
552574
}
553575

576+
func (a *AmqpConnection) recoverTopology() {
577+
Debug("Recovering topology")
578+
// Set the isRecovering flag to prevent duplicate recovery records.
579+
// Using atomic operations since this runs in the recovery goroutine
580+
// while public API methods can be called from user goroutines.
581+
a.management.isRecovering.Store(true)
582+
defer func() {
583+
a.management.isRecovering.Store(false)
584+
}()
585+
586+
for _, exchange := range a.topologyRecoveryRecords.exchanges {
587+
Debug("Recovering exchange", "exchange", exchange.exchangeName)
588+
_, err := a.Management().DeclareExchange(context.Background(), exchange.toIExchangeSpecification())
589+
if err != nil {
590+
Error("Failed to recover exchange", "error", err, "ID", a.Id(), "exchange", exchange.exchangeName)
591+
}
592+
}
593+
for _, queue := range a.topologyRecoveryRecords.queues {
594+
Debug("Recovering queue", "queue", queue.queueName)
595+
_, err := a.Management().DeclareQueue(context.Background(), queue.toIQueueSpecification())
596+
if err != nil {
597+
Error("Failed to recover queue", "error", err, "ID", a.Id(), "queue", queue.queueName)
598+
}
599+
}
600+
for _, binding := range a.topologyRecoveryRecords.bindings {
601+
Debug("Recovering binding", "bind source", binding.sourceExchange, "bind destination", binding.destination)
602+
_, err := a.Management().Bind(context.Background(), binding.toIBindingSpecification())
603+
if err != nil {
604+
Error("Failed to recover binding", "error", err, "ID", a.Id(), "bind source", binding.sourceExchange, "bind destination", binding.destination)
605+
}
606+
}
607+
}
608+
554609
func (a *AmqpConnection) close() {
555610
if a.refMap != nil {
556611
a.refMap.Delete(a.Id())

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmqamqp
22

33
import (
44
"errors"
5+
"slices"
56
"sync"
67
"time"
78
)
@@ -90,3 +91,159 @@ func (e *entitiesTracker) CleanUp() {
9091
return true
9192
})
9293
}
94+
95+
type queueRecoveryRecord struct {
96+
queueName string
97+
queueType TQueueType
98+
autoDelete *bool
99+
exclusive *bool
100+
arguments map[string]any
101+
}
102+
103+
func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
104+
switch q.queueType {
105+
case Quorum:
106+
return &QuorumQueueSpecification{
107+
Name: q.queueName,
108+
Arguments: q.arguments,
109+
}
110+
case Classic:
111+
return &ClassicQueueSpecification{
112+
Name: q.queueName,
113+
IsAutoDelete: *q.autoDelete,
114+
IsExclusive: *q.exclusive,
115+
Arguments: q.arguments,
116+
}
117+
case Stream:
118+
return &StreamQueueSpecification{
119+
Name: q.queueName,
120+
Arguments: q.arguments,
121+
}
122+
}
123+
return nil
124+
}
125+
126+
type exchangeRecoveryRecord struct {
127+
exchangeName string
128+
exchangeType TExchangeType
129+
autoDelete bool
130+
arguments map[string]any
131+
}
132+
133+
func (e *exchangeRecoveryRecord) toIExchangeSpecification() IExchangeSpecification {
134+
switch e.exchangeType {
135+
case Direct:
136+
return &DirectExchangeSpecification{
137+
Name: e.exchangeName,
138+
IsAutoDelete: e.autoDelete,
139+
Arguments: e.arguments,
140+
}
141+
case Topic:
142+
return &TopicExchangeSpecification{
143+
Name: e.exchangeName,
144+
IsAutoDelete: e.autoDelete,
145+
Arguments: e.arguments,
146+
}
147+
case FanOut:
148+
return &FanOutExchangeSpecification{
149+
Name: e.exchangeName,
150+
IsAutoDelete: e.autoDelete,
151+
Arguments: e.arguments,
152+
}
153+
case Headers:
154+
return &HeadersExchangeSpecification{
155+
Name: e.exchangeName,
156+
IsAutoDelete: e.autoDelete,
157+
Arguments: e.arguments,
158+
}
159+
default:
160+
return &CustomExchangeSpecification{
161+
Name: e.exchangeName,
162+
IsAutoDelete: e.autoDelete,
163+
ExchangeTypeName: string(e.exchangeType),
164+
Arguments: e.arguments,
165+
}
166+
}
167+
}
168+
169+
type bindingRecoveryRecord struct {
170+
sourceExchange string
171+
destination string
172+
isDestinationQueue bool
173+
bindingKey string
174+
arguments map[string]any
175+
path string
176+
}
177+
178+
func (b *bindingRecoveryRecord) toIBindingSpecification() IBindingSpecification {
179+
if b.isDestinationQueue {
180+
return &ExchangeToQueueBindingSpecification{
181+
SourceExchange: b.sourceExchange,
182+
DestinationQueue: b.destination,
183+
BindingKey: b.bindingKey,
184+
Arguments: b.arguments,
185+
}
186+
}
187+
return &ExchangeToExchangeBindingSpecification{
188+
SourceExchange: b.sourceExchange,
189+
DestinationExchange: b.destination,
190+
BindingKey: b.bindingKey,
191+
Arguments: b.arguments,
192+
}
193+
}
194+
195+
type topologyRecoveryRecords struct {
196+
queues []queueRecoveryRecord
197+
exchanges []exchangeRecoveryRecord
198+
bindings []bindingRecoveryRecord
199+
}
200+
201+
func newTopologyRecoveryRecords() *topologyRecoveryRecords {
202+
return &topologyRecoveryRecords{
203+
queues: make([]queueRecoveryRecord, 0),
204+
exchanges: make([]exchangeRecoveryRecord, 0),
205+
bindings: make([]bindingRecoveryRecord, 0),
206+
}
207+
}
208+
209+
func (t *topologyRecoveryRecords) addQueueRecord(record queueRecoveryRecord) {
210+
t.queues = append(t.queues, record)
211+
}
212+
213+
func (t *topologyRecoveryRecords) removeQueueRecord(queueName string) {
214+
t.queues = slices.DeleteFunc(t.queues, func(r queueRecoveryRecord) bool {
215+
return r.queueName == queueName
216+
})
217+
}
218+
219+
func (t *topologyRecoveryRecords) addExchangeRecord(record exchangeRecoveryRecord) {
220+
t.exchanges = append(t.exchanges, record)
221+
}
222+
223+
func (t *topologyRecoveryRecords) removeExchangeRecord(exchangeName string) {
224+
t.exchanges = slices.DeleteFunc(t.exchanges, func(r exchangeRecoveryRecord) bool {
225+
return r.exchangeName == exchangeName
226+
})
227+
}
228+
229+
func (t *topologyRecoveryRecords) addBindingRecord(record bindingRecoveryRecord) {
230+
t.bindings = append(t.bindings, record)
231+
}
232+
233+
func (t *topologyRecoveryRecords) removeBindingRecord(bindingPath string) {
234+
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
235+
return r.path == bindingPath
236+
})
237+
}
238+
239+
func (t *topologyRecoveryRecords) removeBindingRecordBySourceExchange(sourceExchange string) {
240+
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
241+
return r.sourceExchange == sourceExchange
242+
})
243+
}
244+
245+
func (t *topologyRecoveryRecords) removeBindingRecordByDestinationQueue(destinationQueue string) {
246+
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
247+
return r.destination == destinationQueue
248+
})
249+
}

0 commit comments

Comments
 (0)