Skip to content

Commit ca01bc3

Browse files
Merge pull request #730 from rabbitmq/rabbitmq-dotnet-client-510
Address topology recovery issue
2 parents 7c7c46a + 83171fe commit ca01bc3

File tree

5 files changed

+124
-57
lines changed

5 files changed

+124
-57
lines changed

RabbitMQDotNetClient.sln

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Microsoft Visual Studio Solution File, Format Version 12.00
2-
# Visual Studio Version 16
3-
VisualStudioVersion = 16.0.29709.97
2+
# Visual Studio 15
3+
VisualStudioVersion = 15.0.28307.1022
44
MinimumVisualStudioVersion = 10.0.40219.1
55
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "projects", "projects", "{068D7DC3-8E6E-4951-B9E3-272C641BF0DE}"
66
EndProject

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
<IncludeSymbols>true</IncludeSymbols>
2525
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
2626
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
27+
<LangVersion>latest</LangVersion>
2728
</PropertyGroup>
2829

2930
<ItemGroup>

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 100 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,17 @@ internal sealed class AutorecoveringConnection : IConnection
6464

6565
private readonly object _recordedEntitiesLock = new object();
6666

67-
private readonly List<AutorecoveringModel> _models = new List<AutorecoveringModel>();
67+
private readonly IDictionary<string, RecordedExchange> _recordedExchanges = new Dictionary<string, RecordedExchange>();
6868

69-
private readonly ConcurrentDictionary<RecordedBinding, byte> _recordedBindings =
70-
new ConcurrentDictionary<RecordedBinding, byte>();
69+
private readonly IDictionary<string, RecordedQueue> _recordedQueues = new Dictionary<string, RecordedQueue>();
7170

72-
private EventHandler<ConnectionBlockedEventArgs> _recordedBlockedEventHandlers;
71+
private readonly IDictionary<RecordedBinding, byte> _recordedBindings = new Dictionary<RecordedBinding, byte>();
72+
73+
private readonly IDictionary<string, RecordedConsumer> _recordedConsumers = new Dictionary<string, RecordedConsumer>();
7374

74-
private readonly IDictionary<string, RecordedConsumer> _recordedConsumers =
75-
new ConcurrentDictionary<string, RecordedConsumer>();
75+
private readonly ICollection<AutorecoveringModel> _models = new List<AutorecoveringModel>();
76+
77+
private EventHandler<ConnectionBlockedEventArgs> _recordedBlockedEventHandlers;
7678
private EventHandler<ShutdownEventArgs> _recordedShutdownEventHandlers;
7779
private EventHandler<EventArgs> _recordedUnblockedEventHandlers;
7880

@@ -301,7 +303,7 @@ public void DeleteRecordedBinding(RecordedBinding rb)
301303
{
302304
lock (_recordedEntitiesLock)
303305
{
304-
((IDictionary<RecordedBinding, byte>)_recordedBindings).Remove(rb);
306+
_recordedBindings.Remove(rb);
305307
}
306308
}
307309

@@ -324,11 +326,11 @@ public void DeleteRecordedExchange(string name)
324326
{
325327
lock (_recordedEntitiesLock)
326328
{
327-
RecordedExchanges.Remove(name);
329+
_recordedExchanges.Remove(name);
328330

329331
// find bindings that need removal, check if some auto-delete exchanges
330332
// might need the same
331-
IEnumerable<RecordedBinding> bs = _recordedBindings.Keys.Where(b => name.Equals(b.Destination));
333+
IEnumerable<RecordedBinding> bs = _recordedBindings.Keys.Where(b => name.Equals(b.Destination)).ToArray();
332334
foreach (RecordedBinding b in bs)
333335
{
334336
DeleteRecordedBinding(b);
@@ -341,10 +343,10 @@ public void DeleteRecordedQueue(string name)
341343
{
342344
lock (_recordedEntitiesLock)
343345
{
344-
RecordedQueues.Remove(name);
346+
_recordedQueues.Remove(name);
345347
// find bindings that need removal, check if some auto-delete exchanges
346348
// might need the same
347-
IEnumerable<RecordedBinding> bs = _recordedBindings.Keys.Where(b => name.Equals(b.Destination));
349+
IEnumerable<RecordedBinding> bs = _recordedBindings.Keys.Where(b => name.Equals(b.Destination)).ToArray();
348350
foreach (RecordedBinding b in bs)
349351
{
350352
DeleteRecordedBinding(b);
@@ -371,13 +373,13 @@ public void MaybeDeleteRecordedAutoDeleteExchange(string exchange)
371373
{
372374
if (!HasMoreDestinationsBoundToExchange(_recordedBindings.Keys, exchange))
373375
{
374-
RecordedExchanges.TryGetValue(exchange, out RecordedExchange rx);
376+
_recordedExchanges.TryGetValue(exchange, out RecordedExchange rx);
375377
// last binding where this exchange is the source is gone,
376378
// remove recorded exchange
377379
// if it is auto-deleted. See bug 26364.
378380
if ((rx != null) && rx.IsAutoDelete)
379381
{
380-
RecordedExchanges.Remove(exchange);
382+
_recordedExchanges.Remove(exchange);
381383
}
382384
}
383385
}
@@ -389,12 +391,12 @@ public void MaybeDeleteRecordedAutoDeleteQueue(string queue)
389391
{
390392
if (!HasMoreConsumersOnQueue(_recordedConsumers.Values, queue))
391393
{
392-
RecordedQueues.TryGetValue(queue, out RecordedQueue rq);
394+
_recordedQueues.TryGetValue(queue, out RecordedQueue rq);
393395
// last consumer on this connection is gone, remove recorded queue
394396
// if it is auto-deleted. See bug 26364.
395397
if ((rq != null) && rq.IsAutoDelete)
396398
{
397-
RecordedQueues.Remove(queue);
399+
_recordedQueues.Remove(queue);
398400
}
399401
}
400402
}
@@ -404,7 +406,10 @@ public void RecordBinding(RecordedBinding rb)
404406
{
405407
lock (_recordedEntitiesLock)
406408
{
407-
_recordedBindings.TryAdd(rb, 0);
409+
if (!_recordedBindings.ContainsKey(rb))
410+
{
411+
_recordedBindings.Add(rb, 0);
412+
}
408413
}
409414
}
410415

@@ -423,15 +428,15 @@ public void RecordExchange(string name, RecordedExchange x)
423428
{
424429
lock (_recordedEntitiesLock)
425430
{
426-
RecordedExchanges[name] = x;
431+
_recordedExchanges[name] = x;
427432
}
428433
}
429434

430435
public void RecordQueue(string name, RecordedQueue q)
431436
{
432437
lock (_recordedEntitiesLock)
433438
{
434-
RecordedQueues[name] = q;
439+
_recordedQueues[name] = q;
435440
}
436441
}
437442

@@ -497,15 +502,19 @@ public void Abort()
497502
{
498503
StopRecoveryLoop();
499504
if (_delegate.IsOpen)
505+
{
500506
_delegate.Abort();
507+
}
501508
}
502509

503510
///<summary>API-side invocation of connection abort.</summary>
504511
public void Abort(ushort reasonCode, string reasonText)
505512
{
506513
StopRecoveryLoop();
507514
if (_delegate.IsOpen)
515+
{
508516
_delegate.Abort(reasonCode, reasonText);
517+
}
509518
}
510519

511520
///<summary>API-side invocation of connection abort with timeout.</summary>
@@ -523,23 +532,29 @@ public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
523532
{
524533
StopRecoveryLoop();
525534
if (_delegate.IsOpen)
535+
{
526536
_delegate.Abort(reasonCode, reasonText, timeout);
537+
}
527538
}
528539

529540
///<summary>API-side invocation of connection.close.</summary>
530541
public void Close()
531542
{
532543
StopRecoveryLoop();
533544
if (_delegate.IsOpen)
545+
{
534546
_delegate.Close();
547+
}
535548
}
536549

537550
///<summary>API-side invocation of connection.close.</summary>
538551
public void Close(ushort reasonCode, string reasonText)
539552
{
540553
StopRecoveryLoop();
541554
if (_delegate.IsOpen)
555+
{
542556
_delegate.Close(reasonCode, reasonText);
557+
}
543558
}
544559

545560
///<summary>API-side invocation of connection.close with timeout.</summary>
@@ -565,16 +580,19 @@ public void Close(ushort reasonCode, string reasonText, TimeSpan timeout)
565580
public IModel CreateModel()
566581
{
567582
EnsureIsOpen();
568-
AutorecoveringModel m;
569-
m = new AutorecoveringModel(this,
570-
CreateNonRecoveringModel());
583+
AutorecoveringModel m = new AutorecoveringModel(this, CreateNonRecoveringModel());
571584
lock (_models)
572585
{
573586
_models.Add(m);
574587
}
575588
return m;
576589
}
577590

591+
void IDisposable.Dispose()
592+
{
593+
Dispose(true);
594+
}
595+
578596
public void HandleConnectionBlocked(string reason)
579597
{
580598
_delegate.HandleConnectionBlocked(reason);
@@ -585,9 +603,26 @@ public void HandleConnectionUnblocked()
585603
_delegate.HandleConnectionUnblocked();
586604
}
587605

588-
void IDisposable.Dispose()
606+
internal int RecordedExchangesCount
589607
{
590-
Dispose(true);
608+
get
609+
{
610+
lock (_recordedEntitiesLock)
611+
{
612+
return _recordedExchanges.Count;
613+
}
614+
}
615+
}
616+
617+
internal int RecordedQueuesCount
618+
{
619+
get
620+
{
621+
lock (_recordedEntitiesLock)
622+
{
623+
return _recordedExchanges.Count;
624+
}
625+
}
591626
}
592627

593628
private void Dispose(bool disposing)
@@ -639,7 +674,7 @@ private void PropagateQueueNameChangeToBindings(string oldName, string newName)
639674

640675
private void PropagateQueueNameChangeToConsumers(string oldName, string newName)
641676
{
642-
lock (_recordedBindings)
677+
lock (_recordedConsumers)
643678
{
644679
IEnumerable<KeyValuePair<string, RecordedConsumer>> cs = _recordedConsumers.
645680
Where(pair => pair.Value.Queue.Equals(oldName));
@@ -652,7 +687,13 @@ private void PropagateQueueNameChangeToConsumers(string oldName, string newName)
652687

653688
private void RecoverBindings()
654689
{
655-
foreach (RecordedBinding b in _recordedBindings.Keys)
690+
IDictionary<RecordedBinding, byte> recordedBindingsCopy = null;
691+
lock (_recordedBindings)
692+
{
693+
recordedBindingsCopy = _recordedBindings.ToDictionary(e => e.Key, e => e.Value);
694+
}
695+
696+
foreach (RecordedBinding b in recordedBindingsCopy.Keys)
656697
{
657698
try
658699
{
@@ -717,7 +758,13 @@ private void RecoverConnectionUnblockedHandlers()
717758

718759
private void RecoverConsumers()
719760
{
720-
foreach (KeyValuePair<string, RecordedConsumer> pair in _recordedConsumers)
761+
IDictionary<string, RecordedConsumer> recordedConsumersCopy = null;
762+
lock (_recordedConsumers)
763+
{
764+
recordedConsumersCopy = _recordedConsumers.ToDictionary(e => e.Key, e => e.Value);
765+
}
766+
767+
foreach (KeyValuePair<string, RecordedConsumer> pair in recordedConsumersCopy)
721768
{
722769
string tag = pair.Key;
723770
RecordedConsumer cons = pair.Value;
@@ -771,7 +818,13 @@ private void RecoverEntities()
771818

772819
private void RecoverExchanges()
773820
{
774-
foreach (RecordedExchange rx in RecordedExchanges.Values)
821+
IDictionary<string, RecordedExchange> recordedExchangesCopy = null;
822+
lock (_recordedEntitiesLock)
823+
{
824+
recordedExchangesCopy = _recordedExchanges.ToDictionary(e => e.Key, e => e.Value);
825+
}
826+
827+
foreach (RecordedExchange rx in recordedExchangesCopy.Values)
775828
{
776829
try
777830
{
@@ -799,18 +852,24 @@ private void RecoverModels()
799852

800853
private void RecoverQueues()
801854
{
802-
lock (RecordedQueues)
855+
IDictionary<string, RecordedQueue> recordedQueuesCopy = null;
856+
lock (_recordedEntitiesLock)
857+
{
858+
recordedQueuesCopy = _recordedQueues.ToDictionary(entry => entry.Key, entry => entry.Value);
859+
}
860+
861+
foreach (KeyValuePair<string, RecordedQueue> pair in recordedQueuesCopy)
803862
{
804-
foreach (KeyValuePair<string, RecordedQueue> pair in RecordedQueues)
863+
string oldName = pair.Key;
864+
RecordedQueue rq = pair.Value;
865+
866+
try
805867
{
806-
string oldName = pair.Key;
807-
RecordedQueue rq = pair.Value;
868+
rq.Recover();
869+
string newName = rq.Name;
808870

809-
try
871+
if (!oldName.Equals(newName))
810872
{
811-
rq.Recover();
812-
string newName = rq.Name;
813-
814873
// Make sure server-named queues are re-added with
815874
// their new names.
816875
// We only remove old name after we've updated the bindings and consumers,
@@ -840,12 +899,12 @@ private void RecoverQueues()
840899
}
841900
}
842901
}
843-
catch (Exception cause)
844-
{
845-
string s = string.Format("Caught an exception while recovering queue {0}: {1}",
846-
oldName, cause.Message);
847-
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
848-
}
902+
}
903+
catch (Exception cause)
904+
{
905+
string s = string.Format("Caught an exception while recovering queue {0}: {1}",
906+
oldName, cause.Message);
907+
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
849908
}
850909
}
851910
}

projects/client/Unit/Unit.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
1212
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
1313
<GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute>
14-
<LangVersion>default</LangVersion>
14+
<LangVersion>latest</LangVersion>
1515
</PropertyGroup>
1616

1717
<ItemGroup>

0 commit comments

Comments
 (0)