Skip to content

Commit e5c0204

Browse files
committed
Topology recovery implementation
Related to #37
1 parent 6599545 commit e5c0204

File tree

7 files changed

+535
-44
lines changed

7 files changed

+535
-44
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.2-rc-management-alpine
10+
readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management-alpine}
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-go-client'

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go.work.sum
2323

2424
# env file
2525
.env
26+
.envrc
2627
.idea/
2728
coverage.txt
2829
.DS_Store

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,16 @@ func (a *AmqpConnOptions) isOAuth2() bool {
108108
func (a *AmqpConnOptions) Clone() *AmqpConnOptions {
109109

110110
cloned := &AmqpConnOptions{
111-
ContainerID: a.ContainerID,
112-
IdleTimeout: a.IdleTimeout,
113-
MaxFrameSize: a.MaxFrameSize,
114-
MaxSessions: a.MaxSessions,
115-
Properties: a.Properties,
116-
SASLType: a.SASLType,
117-
TLSConfig: a.TLSConfig,
118-
WriteTimeout: a.WriteTimeout,
119-
Id: a.Id,
111+
ContainerID: a.ContainerID,
112+
IdleTimeout: a.IdleTimeout,
113+
MaxFrameSize: a.MaxFrameSize,
114+
MaxSessions: a.MaxSessions,
115+
Properties: a.Properties,
116+
SASLType: a.SASLType,
117+
TLSConfig: a.TLSConfig,
118+
WriteTimeout: a.WriteTimeout,
119+
Id: a.Id,
120+
TopologyRecoveryOptions: a.TopologyRecoveryOptions,
120121
}
121122
if a.OAuth2Options != nil {
122123
cloned.OAuth2Options = a.OAuth2Options.Clone()
@@ -126,7 +127,6 @@ func (a *AmqpConnOptions) Clone() *AmqpConnOptions {
126127
}
127128

128129
return cloned
129-
130130
}
131131

132132
type AmqpConnection struct {
@@ -517,8 +517,8 @@ func (a *AmqpConnection) maybeReconnect() {
517517
cancel()
518518

519519
if err == nil {
520+
a.recoverTopology()
520521
a.restartEntities()
521-
// TODO: integration point for topology recovery
522522
a.lifeCycle.SetState(&StateOpen{})
523523
return
524524
}
@@ -573,6 +573,39 @@ func (a *AmqpConnection) restartEntities() {
573573
"consumerFails", consumerFails)
574574
}
575575

576+
func (a *AmqpConnection) recoverTopology() {
577+
Debug("Recovering topology")
578+
// Set the isRecovering flag to prevent duplicate recovery records.
579+
// Using atomic operations since this runs in the recovery goroutine
580+
// while public API methods can be called from user goroutines.
581+
a.management.isRecovering.Store(true)
582+
defer func() {
583+
a.management.isRecovering.Store(false)
584+
}()
585+
586+
for _, exchange := range a.topologyRecoveryRecords.exchanges {
587+
Debug("Recovering exchange", "exchange", exchange.exchangeName)
588+
_, err := a.Management().DeclareExchange(context.Background(), exchange.toIExchangeSpecification())
589+
if err != nil {
590+
Error("Failed to recover exchange", "error", err, "ID", a.Id(), "exchange", exchange.exchangeName)
591+
}
592+
}
593+
for _, queue := range a.topologyRecoveryRecords.queues {
594+
Debug("Recovering queue", "queue", queue.queueName)
595+
_, err := a.Management().DeclareQueue(context.Background(), queue.toIQueueSpecification())
596+
if err != nil {
597+
Error("Failed to recover queue", "error", err, "ID", a.Id(), "queue", queue.queueName)
598+
}
599+
}
600+
for _, binding := range a.topologyRecoveryRecords.bindings {
601+
Debug("Recovering binding", "bind source", binding.sourceExchange, "bind destination", binding.destination)
602+
_, err := a.Management().Bind(context.Background(), binding.toIBindingSpecification())
603+
if err != nil {
604+
Error("Failed to recover binding", "error", err, "ID", a.Id(), "bind source", binding.sourceExchange, "bind destination", binding.destination)
605+
}
606+
}
607+
}
608+
576609
func (a *AmqpConnection) close() {
577610
if a.refMap != nil {
578611
a.refMap.Delete(a.Id())

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,19 +210,19 @@ func (t *topologyRecoveryRecords) addQueueRecord(record queueRecoveryRecord) {
210210
t.queues = append(t.queues, record)
211211
}
212212

213-
func (t *topologyRecoveryRecords) removeQueueRecord(record queueRecoveryRecord) {
213+
func (t *topologyRecoveryRecords) removeQueueRecord(queueName string) {
214214
t.queues = slices.DeleteFunc(t.queues, func(r queueRecoveryRecord) bool {
215-
return r.queueName == record.queueName
215+
return r.queueName == queueName
216216
})
217217
}
218218

219219
func (t *topologyRecoveryRecords) addExchangeRecord(record exchangeRecoveryRecord) {
220220
t.exchanges = append(t.exchanges, record)
221221
}
222222

223-
func (t *topologyRecoveryRecords) removeExchangeRecord(record exchangeRecoveryRecord) {
223+
func (t *topologyRecoveryRecords) removeExchangeRecord(exchangeName string) {
224224
t.exchanges = slices.DeleteFunc(t.exchanges, func(r exchangeRecoveryRecord) bool {
225-
return r.exchangeName == record.exchangeName
225+
return r.exchangeName == exchangeName
226226
})
227227
}
228228

0 commit comments

Comments
 (0)