Skip to content

Commit 6599545

Browse files
committed
Add topology recovery infrastructure and configuration options
This commit introduces the foundational infrastructure for topology recovery: - Add TopologyRecoveryOptions configuration with three modes: disabled, transient queues only, and full recovery - Implement topologyRecoveryRecords to track queues, exchanges, and bindings - Add recovery record types (queueRecoveryRecord, exchangeRecoveryRecord, bindingRecoveryRecord) with conversion methods to specifications - Integrate topology recovery tracking into AmqpConnection and AmqpManagement - Add helper method isQueueDestinationForBindingTransient to determine if binding destinations are transient - Include comprehensive unit tests for recovery record conversions and transient queue detection This is a first step to support topology recovery. The actual recovery logic will be implemented in subsequent commits.
1 parent 4e73e82 commit 6599545

File tree

5 files changed

+1126
-25
lines changed

5 files changed

+1126
-25
lines changed

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ func (o OAuth2Options) Clone() *OAuth2Options {
4545

4646
}
4747

48+
// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
49+
// See [TopologyRecoveryDisabled], [TopologyRecoveryOnlyTransientQueues], and [TopologyRecoveryAllEnabled] for more information.
50+
type TopologyRecoveryOptions byte
51+
52+
const (
53+
// TopologyRecoveryDisabled disables the topology recovery.
54+
TopologyRecoveryDisabled TopologyRecoveryOptions = iota
55+
// TopologyRecoveryOnlyTransientQueues recovers only queues declared as exclusive and/or auto delete, and
56+
// related bindings. Exchanges are not recovered.
57+
TopologyRecoveryOnlyTransientQueues
58+
// TopologyRecoveryAllEnabled recovers all the topology. All exchanges, queues, and bindings are recovered.
59+
TopologyRecoveryAllEnabled
60+
)
61+
4862
type AmqpConnOptions struct {
4963
// wrapper for amqp.ConnOptions
5064
ContainerID string
@@ -76,6 +90,9 @@ type AmqpConnOptions struct {
7690
// when the connection is closed unexpectedly.
7791
RecoveryConfiguration *RecoveryConfiguration
7892

93+
// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
94+
TopologyRecoveryOptions TopologyRecoveryOptions
95+
7996
// The OAuth2Options is used to configure the connection with OAuth2 token.
8097
OAuth2Options *OAuth2Options
8198

@@ -116,16 +133,17 @@ type AmqpConnection struct {
116133
properties map[string]any
117134
featuresAvailable *featuresAvailable
118135

119-
azureConnection *amqp.Conn
120-
management *AmqpManagement
121-
lifeCycle *LifeCycle
122-
amqpConnOptions *AmqpConnOptions
123-
address string
124-
session *amqp.Session
125-
refMap *sync.Map
126-
entitiesTracker *entitiesTracker
127-
mutex sync.RWMutex
128-
closed bool
136+
azureConnection *amqp.Conn
137+
management *AmqpManagement
138+
lifeCycle *LifeCycle
139+
amqpConnOptions *AmqpConnOptions
140+
address string
141+
session *amqp.Session
142+
refMap *sync.Map
143+
entitiesTracker *entitiesTracker
144+
topologyRecoveryRecords *topologyRecoveryRecords
145+
mutex sync.RWMutex
146+
closed bool
129147
}
130148

131149
func (a *AmqpConnection) Properties() map[string]any {
@@ -321,16 +339,19 @@ func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*A
321339
if err != nil {
322340
return nil, err
323341
}
324-
325342
// create the connection
326343
conn := &AmqpConnection{
327-
management: newAmqpManagement(),
328-
lifeCycle: NewLifeCycle(),
329-
amqpConnOptions: connOptions,
330-
entitiesTracker: newEntitiesTracker(),
331-
featuresAvailable: newFeaturesAvailable(),
344+
management: newAmqpManagement(connOptions.TopologyRecoveryOptions),
345+
lifeCycle: NewLifeCycle(),
346+
amqpConnOptions: connOptions,
347+
entitiesTracker: newEntitiesTracker(),
348+
topologyRecoveryRecords: newTopologyRecoveryRecords(),
349+
featuresAvailable: newFeaturesAvailable(),
332350
}
333351

352+
// management needs to access the connection to manage the recovery records
353+
conn.management.topologyRecoveryRecords = conn.topologyRecoveryRecords
354+
334355
err = conn.open(ctx, address, connOptions)
335356
if err != nil {
336357
return nil, err
@@ -497,6 +518,7 @@ func (a *AmqpConnection) maybeReconnect() {
497518

498519
if err == nil {
499520
a.restartEntities()
521+
// TODO: integration point for topology recovery
500522
a.lifeCycle.SetState(&StateOpen{})
501523
return
502524
}

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmqamqp
22

33
import (
44
"errors"
5+
"slices"
56
"sync"
67
"time"
78
)
@@ -90,3 +91,159 @@ func (e *entitiesTracker) CleanUp() {
9091
return true
9192
})
9293
}
94+
95+
type queueRecoveryRecord struct {
96+
queueName string
97+
queueType TQueueType
98+
autoDelete *bool
99+
exclusive *bool
100+
arguments map[string]any
101+
}
102+
103+
func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
104+
switch q.queueType {
105+
case Quorum:
106+
return &QuorumQueueSpecification{
107+
Name: q.queueName,
108+
Arguments: q.arguments,
109+
}
110+
case Classic:
111+
return &ClassicQueueSpecification{
112+
Name: q.queueName,
113+
IsAutoDelete: *q.autoDelete,
114+
IsExclusive: *q.exclusive,
115+
Arguments: q.arguments,
116+
}
117+
case Stream:
118+
return &StreamQueueSpecification{
119+
Name: q.queueName,
120+
Arguments: q.arguments,
121+
}
122+
}
123+
return nil
124+
}
125+
126+
type exchangeRecoveryRecord struct {
127+
exchangeName string
128+
exchangeType TExchangeType
129+
autoDelete bool
130+
arguments map[string]any
131+
}
132+
133+
func (e *exchangeRecoveryRecord) toIExchangeSpecification() IExchangeSpecification {
134+
switch e.exchangeType {
135+
case Direct:
136+
return &DirectExchangeSpecification{
137+
Name: e.exchangeName,
138+
IsAutoDelete: e.autoDelete,
139+
Arguments: e.arguments,
140+
}
141+
case Topic:
142+
return &TopicExchangeSpecification{
143+
Name: e.exchangeName,
144+
IsAutoDelete: e.autoDelete,
145+
Arguments: e.arguments,
146+
}
147+
case FanOut:
148+
return &FanOutExchangeSpecification{
149+
Name: e.exchangeName,
150+
IsAutoDelete: e.autoDelete,
151+
Arguments: e.arguments,
152+
}
153+
case Headers:
154+
return &HeadersExchangeSpecification{
155+
Name: e.exchangeName,
156+
IsAutoDelete: e.autoDelete,
157+
Arguments: e.arguments,
158+
}
159+
default:
160+
return &CustomExchangeSpecification{
161+
Name: e.exchangeName,
162+
IsAutoDelete: e.autoDelete,
163+
ExchangeTypeName: string(e.exchangeType),
164+
Arguments: e.arguments,
165+
}
166+
}
167+
}
168+
169+
type bindingRecoveryRecord struct {
170+
sourceExchange string
171+
destination string
172+
isDestinationQueue bool
173+
bindingKey string
174+
arguments map[string]any
175+
path string
176+
}
177+
178+
func (b *bindingRecoveryRecord) toIBindingSpecification() IBindingSpecification {
179+
if b.isDestinationQueue {
180+
return &ExchangeToQueueBindingSpecification{
181+
SourceExchange: b.sourceExchange,
182+
DestinationQueue: b.destination,
183+
BindingKey: b.bindingKey,
184+
Arguments: b.arguments,
185+
}
186+
}
187+
return &ExchangeToExchangeBindingSpecification{
188+
SourceExchange: b.sourceExchange,
189+
DestinationExchange: b.destination,
190+
BindingKey: b.bindingKey,
191+
Arguments: b.arguments,
192+
}
193+
}
194+
195+
type topologyRecoveryRecords struct {
196+
queues []queueRecoveryRecord
197+
exchanges []exchangeRecoveryRecord
198+
bindings []bindingRecoveryRecord
199+
}
200+
201+
func newTopologyRecoveryRecords() *topologyRecoveryRecords {
202+
return &topologyRecoveryRecords{
203+
queues: make([]queueRecoveryRecord, 0),
204+
exchanges: make([]exchangeRecoveryRecord, 0),
205+
bindings: make([]bindingRecoveryRecord, 0),
206+
}
207+
}
208+
209+
func (t *topologyRecoveryRecords) addQueueRecord(record queueRecoveryRecord) {
210+
t.queues = append(t.queues, record)
211+
}
212+
213+
func (t *topologyRecoveryRecords) removeQueueRecord(record queueRecoveryRecord) {
214+
t.queues = slices.DeleteFunc(t.queues, func(r queueRecoveryRecord) bool {
215+
return r.queueName == record.queueName
216+
})
217+
}
218+
219+
func (t *topologyRecoveryRecords) addExchangeRecord(record exchangeRecoveryRecord) {
220+
t.exchanges = append(t.exchanges, record)
221+
}
222+
223+
func (t *topologyRecoveryRecords) removeExchangeRecord(record exchangeRecoveryRecord) {
224+
t.exchanges = slices.DeleteFunc(t.exchanges, func(r exchangeRecoveryRecord) bool {
225+
return r.exchangeName == record.exchangeName
226+
})
227+
}
228+
229+
func (t *topologyRecoveryRecords) addBindingRecord(record bindingRecoveryRecord) {
230+
t.bindings = append(t.bindings, record)
231+
}
232+
233+
func (t *topologyRecoveryRecords) removeBindingRecord(bindingPath string) {
234+
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
235+
return r.path == bindingPath
236+
})
237+
}
238+
239+
func (t *topologyRecoveryRecords) removeBindingRecordBySourceExchange(sourceExchange string) {
240+
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
241+
return r.sourceExchange == sourceExchange
242+
})
243+
}
244+
245+
func (t *topologyRecoveryRecords) removeBindingRecordByDestinationQueue(destinationQueue string) {
246+
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
247+
return r.destination == destinationQueue
248+
})
249+
}

0 commit comments

Comments
 (0)