Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ set -o xtrace
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
readonly rabbitmq_image=rabbitmq:4.2-rc-management-alpine

readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management-alpine}

readonly docker_name_prefix='rabbitmq-amqp-go-client'
readonly docker_network_name="$docker_name_prefix-network"

declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"

if [[ ! -v GITHUB_ACTIONS ]]
then
GITHUB_ACTIONS='false'
Expand Down Expand Up @@ -49,9 +51,6 @@ fi

set -o nounset

declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"

function start_toxiproxy
{
if [[ $run_toxiproxy == 'true' ]]
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go.work.sum

# env file
.env
.envrc
.idea/
coverage.txt
.DS_Store
Expand Down
107 changes: 81 additions & 26 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ func (o OAuth2Options) Clone() *OAuth2Options {

}

// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
// See [TopologyRecoveryDisabled], [TopologyRecoveryOnlyTransient], and [TopologyRecoveryAllEnabled] for more information.
type TopologyRecoveryOptions byte

const (
// TopologyRecoveryOnlyTransient recovers only queues declared as exclusive and/or auto delete, and
// related bindings. Exchanges are not recovered.
TopologyRecoveryOnlyTransient TopologyRecoveryOptions = iota
// TopologyRecoveryDisabled disables the topology recovery.
TopologyRecoveryDisabled
// TopologyRecoveryAllEnabled recovers all the topology. All exchanges, queues, and bindings are recovered.
TopologyRecoveryAllEnabled
)

type AmqpConnOptions struct {
// wrapper for amqp.ConnOptions
ContainerID string
Expand Down Expand Up @@ -76,6 +90,9 @@ type AmqpConnOptions struct {
// when the connection is closed unexpectedly.
RecoveryConfiguration *RecoveryConfiguration

// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
TopologyRecoveryOptions TopologyRecoveryOptions

// The OAuth2Options is used to configure the connection with OAuth2 token.
OAuth2Options *OAuth2Options

Expand All @@ -91,15 +108,16 @@ func (a *AmqpConnOptions) isOAuth2() bool {
func (a *AmqpConnOptions) Clone() *AmqpConnOptions {

cloned := &AmqpConnOptions{
ContainerID: a.ContainerID,
IdleTimeout: a.IdleTimeout,
MaxFrameSize: a.MaxFrameSize,
MaxSessions: a.MaxSessions,
Properties: a.Properties,
SASLType: a.SASLType,
TLSConfig: a.TLSConfig,
WriteTimeout: a.WriteTimeout,
Id: a.Id,
ContainerID: a.ContainerID,
IdleTimeout: a.IdleTimeout,
MaxFrameSize: a.MaxFrameSize,
MaxSessions: a.MaxSessions,
Properties: a.Properties,
SASLType: a.SASLType,
TLSConfig: a.TLSConfig,
WriteTimeout: a.WriteTimeout,
Id: a.Id,
TopologyRecoveryOptions: a.TopologyRecoveryOptions,
}
if a.OAuth2Options != nil {
cloned.OAuth2Options = a.OAuth2Options.Clone()
Expand All @@ -109,23 +127,23 @@ func (a *AmqpConnOptions) Clone() *AmqpConnOptions {
}

return cloned

}

type AmqpConnection struct {
properties map[string]any
featuresAvailable *featuresAvailable

azureConnection *amqp.Conn
management *AmqpManagement
lifeCycle *LifeCycle
amqpConnOptions *AmqpConnOptions
address string
session *amqp.Session
refMap *sync.Map
entitiesTracker *entitiesTracker
mutex sync.RWMutex
closed bool
azureConnection *amqp.Conn
management *AmqpManagement
lifeCycle *LifeCycle
amqpConnOptions *AmqpConnOptions
address string
session *amqp.Session
refMap *sync.Map
entitiesTracker *entitiesTracker
topologyRecoveryRecords *topologyRecoveryRecords
mutex sync.RWMutex
closed bool
}

func (a *AmqpConnection) Properties() map[string]any {
Expand Down Expand Up @@ -321,16 +339,19 @@ func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*A
if err != nil {
return nil, err
}

// create the connection
conn := &AmqpConnection{
management: newAmqpManagement(),
lifeCycle: NewLifeCycle(),
amqpConnOptions: connOptions,
entitiesTracker: newEntitiesTracker(),
featuresAvailable: newFeaturesAvailable(),
management: newAmqpManagement(connOptions.TopologyRecoveryOptions),
lifeCycle: NewLifeCycle(),
amqpConnOptions: connOptions,
entitiesTracker: newEntitiesTracker(),
topologyRecoveryRecords: newTopologyRecoveryRecords(),
featuresAvailable: newFeaturesAvailable(),
}

// management needs to access the connection to manage the recovery records
conn.management.topologyRecoveryRecords = conn.topologyRecoveryRecords

err = conn.open(ctx, address, connOptions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -496,6 +517,7 @@ func (a *AmqpConnection) maybeReconnect() {
cancel()

if err == nil {
a.recoverTopology()
a.restartEntities()
a.lifeCycle.SetState(&StateOpen{})
return
Expand Down Expand Up @@ -551,6 +573,39 @@ func (a *AmqpConnection) restartEntities() {
"consumerFails", consumerFails)
}

func (a *AmqpConnection) recoverTopology() {
Debug("Recovering topology")
// Set the isRecovering flag to prevent duplicate recovery records.
// Using atomic operations since this runs in the recovery goroutine
// while public API methods can be called from user goroutines.
a.management.isRecovering.Store(true)
defer func() {
a.management.isRecovering.Store(false)
}()

for _, exchange := range a.topologyRecoveryRecords.exchanges {
Debug("Recovering exchange", "exchange", exchange.exchangeName)
_, err := a.Management().DeclareExchange(context.Background(), exchange.toIExchangeSpecification())
if err != nil {
Error("Failed to recover exchange", "error", err, "ID", a.Id(), "exchange", exchange.exchangeName)
}
}
for _, queue := range a.topologyRecoveryRecords.queues {
Debug("Recovering queue", "queue", queue.queueName)
_, err := a.Management().DeclareQueue(context.Background(), queue.toIQueueSpecification())
if err != nil {
Error("Failed to recover queue", "error", err, "ID", a.Id(), "queue", queue.queueName)
}
}
for _, binding := range a.topologyRecoveryRecords.bindings {
Debug("Recovering binding", "bind source", binding.sourceExchange, "bind destination", binding.destination)
_, err := a.Management().Bind(context.Background(), binding.toIBindingSpecification())
if err != nil {
Error("Failed to recover binding", "error", err, "ID", a.Id(), "bind source", binding.sourceExchange, "bind destination", binding.destination)
}
}
}

func (a *AmqpConnection) close() {
if a.refMap != nil {
a.refMap.Delete(a.Id())
Expand Down
157 changes: 157 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmqamqp

import (
"errors"
"slices"
"sync"
"time"
)
Expand Down Expand Up @@ -90,3 +91,159 @@ func (e *entitiesTracker) CleanUp() {
return true
})
}

type queueRecoveryRecord struct {
queueName string
queueType TQueueType
autoDelete *bool
exclusive *bool
arguments map[string]any
}

func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
switch q.queueType {
case Quorum:
return &QuorumQueueSpecification{
Name: q.queueName,
Arguments: q.arguments,
}
case Classic:
return &ClassicQueueSpecification{
Name: q.queueName,
IsAutoDelete: *q.autoDelete,
IsExclusive: *q.exclusive,
Arguments: q.arguments,
}
case Stream:
return &StreamQueueSpecification{
Name: q.queueName,
Arguments: q.arguments,
}
}
return nil
}

type exchangeRecoveryRecord struct {
exchangeName string
exchangeType TExchangeType
autoDelete bool
arguments map[string]any
}

func (e *exchangeRecoveryRecord) toIExchangeSpecification() IExchangeSpecification {
switch e.exchangeType {
case Direct:
return &DirectExchangeSpecification{
Name: e.exchangeName,
IsAutoDelete: e.autoDelete,
Arguments: e.arguments,
}
case Topic:
return &TopicExchangeSpecification{
Name: e.exchangeName,
IsAutoDelete: e.autoDelete,
Arguments: e.arguments,
}
case FanOut:
return &FanOutExchangeSpecification{
Name: e.exchangeName,
IsAutoDelete: e.autoDelete,
Arguments: e.arguments,
}
case Headers:
return &HeadersExchangeSpecification{
Name: e.exchangeName,
IsAutoDelete: e.autoDelete,
Arguments: e.arguments,
}
default:
return &CustomExchangeSpecification{
Name: e.exchangeName,
IsAutoDelete: e.autoDelete,
ExchangeTypeName: string(e.exchangeType),
Arguments: e.arguments,
}
}
}

type bindingRecoveryRecord struct {
sourceExchange string
destination string
isDestinationQueue bool
bindingKey string
arguments map[string]any
path string
}

func (b *bindingRecoveryRecord) toIBindingSpecification() IBindingSpecification {
if b.isDestinationQueue {
return &ExchangeToQueueBindingSpecification{
SourceExchange: b.sourceExchange,
DestinationQueue: b.destination,
BindingKey: b.bindingKey,
Arguments: b.arguments,
}
}
return &ExchangeToExchangeBindingSpecification{
SourceExchange: b.sourceExchange,
DestinationExchange: b.destination,
BindingKey: b.bindingKey,
Arguments: b.arguments,
}
}

type topologyRecoveryRecords struct {
queues []queueRecoveryRecord
exchanges []exchangeRecoveryRecord
bindings []bindingRecoveryRecord
}

func newTopologyRecoveryRecords() *topologyRecoveryRecords {
return &topologyRecoveryRecords{
queues: make([]queueRecoveryRecord, 0),
exchanges: make([]exchangeRecoveryRecord, 0),
bindings: make([]bindingRecoveryRecord, 0),
}
}

func (t *topologyRecoveryRecords) addQueueRecord(record queueRecoveryRecord) {
t.queues = append(t.queues, record)
}

func (t *topologyRecoveryRecords) removeQueueRecord(queueName string) {
t.queues = slices.DeleteFunc(t.queues, func(r queueRecoveryRecord) bool {
return r.queueName == queueName
})
}

func (t *topologyRecoveryRecords) addExchangeRecord(record exchangeRecoveryRecord) {
t.exchanges = append(t.exchanges, record)
}

func (t *topologyRecoveryRecords) removeExchangeRecord(exchangeName string) {
t.exchanges = slices.DeleteFunc(t.exchanges, func(r exchangeRecoveryRecord) bool {
return r.exchangeName == exchangeName
})
}

func (t *topologyRecoveryRecords) addBindingRecord(record bindingRecoveryRecord) {
t.bindings = append(t.bindings, record)
}

func (t *topologyRecoveryRecords) removeBindingRecord(bindingPath string) {
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
return r.path == bindingPath
})
}

func (t *topologyRecoveryRecords) removeBindingRecordBySourceExchange(sourceExchange string) {
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
return r.sourceExchange == sourceExchange
})
}

func (t *topologyRecoveryRecords) removeBindingRecordByDestinationQueue(destinationQueue string) {
t.bindings = slices.DeleteFunc(t.bindings, func(r bindingRecoveryRecord) bool {
return r.destination == destinationQueue
})
}
Loading