Skip to content

Commit 503580a

Browse files
Merge pull request #1081 from bollhals/feature/backportRecoverFix
fix topology recovery
2 parents 607321d + e5535b9 commit 503580a

File tree

9 files changed

+176
-172
lines changed

9 files changed

+176
-172
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -413,15 +413,29 @@ private bool TryPerformAutomaticRecovery()
413413
{
414414
if (TryRecoverConnectionDelegate())
415415
{
416-
RecoverConnectionShutdownHandlers();
417-
RecoverConnectionBlockedHandlers();
418-
RecoverConnectionUnblockedHandlers();
419-
420-
RecoverModels();
421-
if (_factory.TopologyRecoveryEnabled)
416+
lock (_recordedEntitiesLock)
422417
{
423-
RecoverEntities();
424-
RecoverConsumers();
418+
RecoverConnectionShutdownHandlers();
419+
RecoverConnectionBlockedHandlers();
420+
RecoverConnectionUnblockedHandlers();
421+
422+
if (_factory.TopologyRecoveryEnabled)
423+
{
424+
// The recovery sequence is the following:
425+
//
426+
// 1. Recover exchanges
427+
// 2. Recover queues
428+
// 3. Recover bindings
429+
// 4. Recover consumers
430+
using (var recoveryModel = _delegate.CreateModel())
431+
{
432+
RecoverExchanges(recoveryModel);
433+
RecoverQueues(recoveryModel);
434+
RecoverBindings(recoveryModel);
435+
}
436+
}
437+
438+
RecoverModelsAndItsConsumers();
425439
}
426440

427441
ESLog.Info("Connection recovery completed");
@@ -932,7 +946,7 @@ private void PropagateQueueNameChangeToConsumers(string oldName, string newName)
932946
}
933947
}
934948

935-
private void RecoverBindings()
949+
private void RecoverBindings(IModel model)
936950
{
937951
Dictionary<RecordedBinding, byte> recordedBindingsCopy;
938952
lock (_recordedBindings)
@@ -944,7 +958,7 @@ private void RecoverBindings()
944958
{
945959
try
946960
{
947-
b.Recover();
961+
b.Recover(model);
948962
}
949963
catch (Exception cause)
950964
{
@@ -1023,7 +1037,7 @@ private void RecoverConnectionUnblockedHandlers()
10231037
_delegate.ConnectionUnblocked += _recordedUnblockedEventHandlers;
10241038
}
10251039

1026-
private void RecoverConsumers()
1040+
internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse)
10271041
{
10281042
if (_disposed)
10291043
{
@@ -1038,12 +1052,16 @@ private void RecoverConsumers()
10381052

10391053
foreach (KeyValuePair<string, RecordedConsumer> pair in recordedConsumersCopy)
10401054
{
1041-
string tag = pair.Key;
10421055
RecordedConsumer cons = pair.Value;
1056+
if (cons.Model != modelToRecover)
1057+
{
1058+
continue;
1059+
}
10431060

1061+
string tag = pair.Key;
10441062
try
10451063
{
1046-
string newTag = cons.Recover();
1064+
string newTag = cons.Recover(channelToUse);
10471065
lock (_recordedConsumers)
10481066
{
10491067
// make sure server-generated tags are re-added
@@ -1075,20 +1093,7 @@ private void RecoverConsumers()
10751093
}
10761094
}
10771095

1078-
private void RecoverEntities()
1079-
{
1080-
// The recovery sequence is the following:
1081-
//
1082-
// 1. Recover exchanges
1083-
// 2. Recover queues
1084-
// 3. Recover bindings
1085-
// 4. Recover consumers
1086-
RecoverExchanges();
1087-
RecoverQueues();
1088-
RecoverBindings();
1089-
}
1090-
1091-
private void RecoverExchanges()
1096+
private void RecoverExchanges(IModel model)
10921097
{
10931098
Dictionary<string, RecordedExchange> recordedExchangesCopy;
10941099
lock (_recordedEntitiesLock)
@@ -1100,7 +1105,7 @@ private void RecoverExchanges()
11001105
{
11011106
try
11021107
{
1103-
rx.Recover();
1108+
rx.Recover(model);
11041109
}
11051110
catch (Exception cause)
11061111
{
@@ -1111,18 +1116,18 @@ private void RecoverExchanges()
11111116
}
11121117
}
11131118

1114-
private void RecoverModels()
1119+
private void RecoverModelsAndItsConsumers()
11151120
{
11161121
lock (_models)
11171122
{
11181123
foreach (AutorecoveringModel m in _models)
11191124
{
1120-
m.AutomaticallyRecover(this);
1125+
m.AutomaticallyRecover(this, _factory.TopologyRecoveryEnabled);
11211126
}
11221127
}
11231128
}
11241129

1125-
private void RecoverQueues()
1130+
private void RecoverQueues(IModel model)
11261131
{
11271132
Dictionary<string, RecordedQueue> recordedQueuesCopy;
11281133
lock (_recordedEntitiesLock)
@@ -1137,7 +1142,7 @@ private void RecoverQueues()
11371142

11381143
try
11391144
{
1140-
rq.Recover();
1145+
rq.Recover(model);
11411146
string newName = rq.Name;
11421147

11431148
if (!oldName.Equals(newName))

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 41 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ public ulong NextPublishSeqNo
399399
}
400400
}
401401

402-
public void AutomaticallyRecover(AutorecoveringConnection conn)
402+
public void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
403403
{
404404
if (_disposed)
405405
{
@@ -409,29 +409,45 @@ public void AutomaticallyRecover(AutorecoveringConnection conn)
409409
_connection = conn;
410410
RecoveryAwareModel defunctModel = _delegate;
411411

412-
_delegate = conn.CreateNonRecoveringModel();
413-
_delegate.InheritOffsetFrom(defunctModel);
412+
var newModel = conn.CreateNonRecoveringModel();
413+
newModel.InheritOffsetFrom(defunctModel);
414414

415-
RecoverModelShutdownHandlers();
416-
RecoverState();
415+
lock (_eventLock)
416+
{
417+
newModel.ModelShutdown += _recordedShutdownEventHandlers;
418+
newModel.BasicReturn += _recordedBasicReturnEventHandlers;
419+
newModel.BasicAcks += _recordedBasicAckEventHandlers;
420+
newModel.BasicNacks += _recordedBasicNackEventHandlers;
421+
newModel.CallbackException += _recordedCallbackExceptionEventHandlers;
422+
}
417423

418-
RecoverBasicReturnHandlers();
419-
RecoverBasicAckHandlers();
420-
RecoverBasicNackHandlers();
421-
RecoverCallbackExceptionHandlers();
424+
if (_prefetchCountConsumer != 0)
425+
{
426+
newModel.BasicQos(0, _prefetchCountConsumer, false);
427+
}
422428

423-
RunRecoveryEventHandlers();
424-
}
429+
if (_prefetchCountGlobal != 0)
430+
{
431+
newModel.BasicQos(0, _prefetchCountGlobal, true);
432+
}
425433

426-
public void BasicQos(ushort prefetchCount,
427-
bool global)
428-
{
429-
if (_disposed)
434+
if (_usesPublisherConfirms)
430435
{
431-
throw new ObjectDisposedException(GetType().FullName);
436+
newModel.ConfirmSelect();
432437
}
433438

434-
_delegate.BasicQos(0, prefetchCount, global);
439+
if (_usesTransactions)
440+
{
441+
newModel.TxSelect();
442+
}
443+
444+
if (recoverConsumers)
445+
{
446+
_connection.RecoverConsumers(this, newModel);
447+
}
448+
449+
_delegate = newModel;
450+
RunRecoveryEventHandlers();
435451
}
436452

437453
public void Close(ushort replyCode, string replyText, bool abort)
@@ -1342,7 +1358,7 @@ public void ExchangeBind(string destination,
13421358
throw new ObjectDisposedException(GetType().FullName);
13431359
}
13441360

1345-
RecordedBinding eb = new RecordedExchangeBinding(this).
1361+
RecordedBinding eb = new RecordedExchangeBinding().
13461362
WithSource(source).
13471363
WithDestination(destination).
13481364
WithRoutingKey(routingKey).
@@ -1372,7 +1388,7 @@ public void ExchangeDeclare(string exchange, string type, bool durable,
13721388
throw new ObjectDisposedException(GetType().FullName);
13731389
}
13741390

1375-
RecordedExchange rx = new RecordedExchange(this, exchange).
1391+
RecordedExchange rx = new RecordedExchange(exchange).
13761392
WithType(type).
13771393
WithDurable(durable).
13781394
WithAutoDelete(autoDelete).
@@ -1393,7 +1409,7 @@ public void ExchangeDeclareNoWait(string exchange,
13931409
throw new ObjectDisposedException(GetType().FullName);
13941410
}
13951411

1396-
RecordedExchange rx = new RecordedExchange(this, exchange).
1412+
RecordedExchange rx = new RecordedExchange(exchange).
13971413
WithType(type).
13981414
WithDurable(durable).
13991415
WithAutoDelete(autoDelete).
@@ -1447,7 +1463,7 @@ public void ExchangeUnbind(string destination,
14471463
throw new ObjectDisposedException(GetType().FullName);
14481464
}
14491465

1450-
RecordedBinding eb = new RecordedExchangeBinding(this).
1466+
RecordedBinding eb = new RecordedExchangeBinding().
14511467
WithSource(source).
14521468
WithDestination(destination).
14531469
WithRoutingKey(routingKey).
@@ -1480,7 +1496,7 @@ public void QueueBind(string queue,
14801496
throw new ObjectDisposedException(GetType().FullName);
14811497
}
14821498

1483-
RecordedBinding qb = new RecordedQueueBinding(this).
1499+
RecordedBinding qb = new RecordedQueueBinding().
14841500
WithSource(exchange).
14851501
WithDestination(queue).
14861502
WithRoutingKey(routingKey).
@@ -1513,7 +1529,7 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable,
15131529

15141530
QueueDeclareOk result = _delegate.QueueDeclare(queue, durable, exclusive,
15151531
autoDelete, arguments);
1516-
RecordedQueue rq = new RecordedQueue(this, result.QueueName).
1532+
RecordedQueue rq = new RecordedQueue(result.QueueName).
15171533
Durable(durable).
15181534
Exclusive(exclusive).
15191535
AutoDelete(autoDelete).
@@ -1534,7 +1550,7 @@ public void QueueDeclareNoWait(string queue, bool durable,
15341550

15351551
_delegate.QueueDeclareNoWait(queue, durable, exclusive,
15361552
autoDelete, arguments);
1537-
RecordedQueue rq = new RecordedQueue(this, queue).
1553+
RecordedQueue rq = new RecordedQueue(queue).
15381554
Durable(durable).
15391555
Exclusive(exclusive).
15401556
AutoDelete(autoDelete).
@@ -1620,7 +1636,7 @@ public void QueueUnbind(string queue,
16201636
throw new ObjectDisposedException(GetType().FullName);
16211637
}
16221638

1623-
RecordedBinding qb = new RecordedQueueBinding(this).
1639+
RecordedBinding qb = new RecordedQueueBinding().
16241640
WithSource(exchange).
16251641
WithDestination(queue).
16261642
WithRoutingKey(routingKey).
@@ -1763,42 +1779,6 @@ private void RecoverCallbackExceptionHandlers()
17631779
}
17641780
}
17651781

1766-
private void RecoverModelShutdownHandlers()
1767-
{
1768-
if (_disposed)
1769-
{
1770-
throw new ObjectDisposedException(GetType().FullName);
1771-
}
1772-
1773-
lock (_eventLock)
1774-
{
1775-
_delegate.ModelShutdown += _recordedShutdownEventHandlers;
1776-
}
1777-
}
1778-
1779-
private void RecoverState()
1780-
{
1781-
if (_prefetchCountConsumer != 0)
1782-
{
1783-
BasicQos(_prefetchCountConsumer, false);
1784-
}
1785-
1786-
if (_prefetchCountGlobal != 0)
1787-
{
1788-
BasicQos(_prefetchCountGlobal, true);
1789-
}
1790-
1791-
if (_usesPublisherConfirms)
1792-
{
1793-
ConfirmSelect();
1794-
}
1795-
1796-
if (_usesTransactions)
1797-
{
1798-
TxSelect();
1799-
}
1800-
}
1801-
18021782
private void RunRecoveryEventHandlers()
18031783
{
18041784
if (_disposed)

projects/RabbitMQ.Client/client/impl/RecordedBinding.cs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,8 @@
3333

3434
namespace RabbitMQ.Client.Impl
3535
{
36-
internal abstract class RecordedBinding : RecordedEntity
36+
internal abstract class RecordedBinding
3737
{
38-
public RecordedBinding(AutorecoveringModel model) : base(model)
39-
{
40-
}
41-
4238
public IDictionary<string, object> Arguments { get; protected set; }
4339
public string Destination { get; set; }
4440
public string RoutingKey { get; protected set; }
@@ -78,7 +74,7 @@ public override int GetHashCode()
7874
(Arguments != null ? Arguments.GetHashCode() : 0);
7975
}
8076

81-
public virtual void Recover()
77+
public virtual void Recover(IModel model)
8278
{
8379
}
8480

@@ -112,29 +108,20 @@ public RecordedBinding WithSource(string value)
112108
}
113109
}
114110

115-
116111
internal sealed class RecordedQueueBinding : RecordedBinding
117112
{
118-
public RecordedQueueBinding(AutorecoveringModel model) : base(model)
119-
{
120-
}
121-
122-
public override void Recover()
113+
public override void Recover(IModel model)
123114
{
124-
ModelDelegate.QueueBind(Destination, Source, RoutingKey, Arguments);
115+
model.QueueBind(Destination, Source, RoutingKey, Arguments);
125116
}
126117
}
127118

128119

129120
internal sealed class RecordedExchangeBinding : RecordedBinding
130121
{
131-
public RecordedExchangeBinding(AutorecoveringModel model) : base(model)
132-
{
133-
}
134-
135-
public override void Recover()
122+
public override void Recover(IModel model)
136123
{
137-
ModelDelegate.ExchangeBind(Destination, Source, RoutingKey, Arguments);
124+
model.ExchangeBind(Destination, Source, RoutingKey, Arguments);
138125
}
139126
}
140127
}

0 commit comments

Comments
 (0)