43
43
using RabbitMQ . Client . Impl ;
44
44
using System ;
45
45
using System . Collections . Generic ;
46
+ using System . Collections . Concurrent ;
46
47
using System . Linq ;
47
- using System . Net ;
48
48
using System . Threading ;
49
49
using System . Threading . Tasks ;
50
50
@@ -53,6 +53,8 @@ namespace RabbitMQ.Client.Framing.Impl
53
53
public class AutorecoveringConnection : IConnection , IRecoverable
54
54
{
55
55
public readonly object m_eventLock = new object ( ) ;
56
+
57
+ public readonly object manuallyClosedLock = new object ( ) ;
56
58
protected Connection m_delegate ;
57
59
protected ConnectionFactory m_factory ;
58
60
@@ -71,19 +73,20 @@ public class AutorecoveringConnection : IConnection, IRecoverable
71
73
72
74
protected List < AutorecoveringModel > m_models = new List < AutorecoveringModel > ( ) ;
73
75
74
- protected HashSet < RecordedBinding > m_recordedBindings = new HashSet < RecordedBinding > ( ) ;
76
+ protected IDictionary < RecordedBinding , byte > m_recordedBindings =
77
+ new ConcurrentDictionary < RecordedBinding , byte > ( ) ;
75
78
76
79
protected List < EventHandler < ConnectionBlockedEventArgs > > m_recordedBlockedEventHandlers =
77
80
new List < EventHandler < ConnectionBlockedEventArgs > > ( ) ;
78
81
79
82
protected IDictionary < string , RecordedConsumer > m_recordedConsumers =
80
- new Dictionary < string , RecordedConsumer > ( ) ;
83
+ new ConcurrentDictionary < string , RecordedConsumer > ( ) ;
81
84
82
85
protected IDictionary < string , RecordedExchange > m_recordedExchanges =
83
- new Dictionary < string , RecordedExchange > ( ) ;
86
+ new ConcurrentDictionary < string , RecordedExchange > ( ) ;
84
87
85
88
protected IDictionary < string , RecordedQueue > m_recordedQueues =
86
- new Dictionary < string , RecordedQueue > ( ) ;
89
+ new ConcurrentDictionary < string , RecordedQueue > ( ) ;
87
90
88
91
protected List < EventHandler < ShutdownEventArgs > > m_recordedShutdownEventHandlers =
89
92
new List < EventHandler < ShutdownEventArgs > > ( ) ;
@@ -101,6 +104,23 @@ public AutorecoveringConnection(ConnectionFactory factory, string clientProvided
101
104
this . ClientProvidedName = clientProvidedName ;
102
105
}
103
106
107
+ private bool ManuallyClosed
108
+ {
109
+ get
110
+ {
111
+ lock ( manuallyClosedLock )
112
+ {
113
+ return manuallyClosed ;
114
+ }
115
+ }
116
+ set
117
+ {
118
+ lock ( manuallyClosedLock )
119
+ {
120
+ manuallyClosed = value ; }
121
+ }
122
+ }
123
+
104
124
public event EventHandler < CallbackExceptionEventArgs > CallbackException
105
125
{
106
126
add
@@ -338,7 +358,7 @@ public void BeginAutomaticRecovery()
338
358
339
359
recoveryTaskFactory . StartNew ( ( ) =>
340
360
{
341
- if ( ! self . manuallyClosed )
361
+ if ( ! self . ManuallyClosed )
342
362
{
343
363
try
344
364
{
@@ -396,7 +416,7 @@ public void DeleteRecordedBinding(RecordedBinding rb)
396
416
{
397
417
lock ( m_recordedEntitiesLock )
398
418
{
399
- m_recordedBindings . RemoveWhere ( b => b . Equals ( rb ) ) ;
419
+ m_recordedBindings . Remove ( rb ) ;
400
420
}
401
421
}
402
422
@@ -423,11 +443,10 @@ public void DeleteRecordedExchange(string name)
423
443
424
444
// find bindings that need removal, check if some auto-delete exchanges
425
445
// might need the same
426
- List < RecordedBinding > bs = m_recordedBindings . Where ( b => name . Equals ( b . Destination ) ) .
427
- ToList ( ) ;
428
- m_recordedBindings . RemoveWhere ( b => name . Equals ( b . Destination ) ) ;
446
+ var bs = m_recordedBindings . Keys . Where ( b => name . Equals ( b . Destination ) ) ;
429
447
foreach ( RecordedBinding b in bs )
430
448
{
449
+ m_recordedBindings . Remove ( b ) ;
431
450
MaybeDeleteRecordedAutoDeleteExchange ( b . Source ) ;
432
451
}
433
452
}
@@ -440,11 +459,10 @@ public void DeleteRecordedQueue(string name)
440
459
m_recordedQueues . Remove ( name ) ;
441
460
// find bindings that need removal, check if some auto-delete exchanges
442
461
// might need the same
443
- List < RecordedBinding > bs = m_recordedBindings . Where ( b => name . Equals ( b . Destination ) ) .
444
- ToList ( ) ;
445
- m_recordedBindings . RemoveWhere ( b => name . Equals ( b . Destination ) ) ;
462
+ var bs = m_recordedBindings . Keys . Where ( b => name . Equals ( b . Destination ) ) ;
446
463
foreach ( RecordedBinding b in bs )
447
464
{
465
+ m_recordedBindings . Remove ( b ) ;
448
466
MaybeDeleteRecordedAutoDeleteExchange ( b . Source ) ;
449
467
}
450
468
}
@@ -466,7 +484,7 @@ public void MaybeDeleteRecordedAutoDeleteExchange(string exchange)
466
484
{
467
485
lock ( m_recordedEntitiesLock )
468
486
{
469
- if ( ! HasMoreDestinationsBoundToExchange ( m_recordedBindings , exchange ) )
487
+ if ( ! HasMoreDestinationsBoundToExchange ( m_recordedBindings . Keys , exchange ) )
470
488
{
471
489
RecordedExchange rx ;
472
490
m_recordedExchanges . TryGetValue ( exchange , out rx ) ;
@@ -503,7 +521,7 @@ public void RecordBinding(RecordedBinding rb)
503
521
{
504
522
lock ( m_recordedEntitiesLock )
505
523
{
506
- m_recordedBindings . Add ( rb ) ;
524
+ m_recordedBindings . Add ( rb , 0 ) ;
507
525
}
508
526
}
509
527
@@ -601,56 +619,56 @@ private void Init(IFrameHandler fh)
601
619
///<summary>API-side invocation of connection abort.</summary>
602
620
public void Abort ( )
603
621
{
604
- this . manuallyClosed = true ;
622
+ this . ManuallyClosed = true ;
605
623
m_delegate . Abort ( ) ;
606
624
}
607
625
608
626
///<summary>API-side invocation of connection abort.</summary>
609
627
public void Abort ( ushort reasonCode , string reasonText )
610
628
{
611
- this . manuallyClosed = true ;
629
+ this . ManuallyClosed = true ;
612
630
m_delegate . Abort ( reasonCode , reasonText ) ;
613
631
}
614
632
615
633
///<summary>API-side invocation of connection abort with timeout.</summary>
616
634
public void Abort ( int timeout )
617
635
{
618
- this . manuallyClosed = true ;
636
+ this . ManuallyClosed = true ;
619
637
m_delegate . Abort ( timeout ) ;
620
638
}
621
639
622
640
///<summary>API-side invocation of connection abort with timeout.</summary>
623
641
public void Abort ( ushort reasonCode , string reasonText , int timeout )
624
642
{
625
- this . manuallyClosed = true ;
643
+ this . ManuallyClosed = true ;
626
644
m_delegate . Abort ( reasonCode , reasonText , timeout ) ;
627
645
}
628
646
629
647
///<summary>API-side invocation of connection.close.</summary>
630
648
public void Close ( )
631
649
{
632
- this . manuallyClosed = true ;
650
+ this . ManuallyClosed = true ;
633
651
m_delegate . Close ( ) ;
634
652
}
635
653
636
654
///<summary>API-side invocation of connection.close.</summary>
637
655
public void Close ( ushort reasonCode , string reasonText )
638
656
{
639
- this . manuallyClosed = true ;
657
+ this . ManuallyClosed = true ;
640
658
m_delegate . Close ( reasonCode , reasonText ) ;
641
659
}
642
660
643
661
///<summary>API-side invocation of connection.close with timeout.</summary>
644
662
public void Close ( int timeout )
645
663
{
646
- this . manuallyClosed = true ;
664
+ this . ManuallyClosed = true ;
647
665
m_delegate . Close ( timeout ) ;
648
666
}
649
667
650
668
///<summary>API-side invocation of connection.close with timeout.</summary>
651
669
public void Close ( ushort reasonCode , string reasonText , int timeout )
652
670
{
653
- this . manuallyClosed = true ;
671
+ this . ManuallyClosed = true ;
654
672
m_delegate . Close ( reasonCode , reasonText , timeout ) ;
655
673
}
656
674
@@ -719,8 +737,7 @@ protected void PropagateQueueNameChangeToBindings(string oldName, string newName
719
737
{
720
738
lock ( m_recordedBindings )
721
739
{
722
- IEnumerable < RecordedBinding > bs = m_recordedBindings .
723
- Where ( b => b . Destination . Equals ( oldName ) ) ;
740
+ var bs = m_recordedBindings . Keys . Where ( b => b . Destination . Equals ( oldName ) ) ;
724
741
foreach ( RecordedBinding b in bs )
725
742
{
726
743
b . Destination = newName ;
@@ -743,7 +760,7 @@ protected void PropagateQueueNameChangeToConsumers(string oldName, string newNam
743
760
744
761
protected void RecoverBindings ( )
745
762
{
746
- foreach ( RecordedBinding b in m_recordedBindings )
763
+ foreach ( var b in m_recordedBindings . Keys )
747
764
{
748
765
try
749
766
{
@@ -815,8 +832,7 @@ protected void RecoverConnectionUnblockedHandlers()
815
832
816
833
protected void RecoverConsumers ( )
817
834
{
818
- var dict = new Dictionary < string , RecordedConsumer > ( m_recordedConsumers ) ;
819
- foreach ( KeyValuePair < string , RecordedConsumer > pair in dict )
835
+ foreach ( KeyValuePair < string , RecordedConsumer > pair in m_recordedConsumers )
820
836
{
821
837
string tag = pair . Key ;
822
838
RecordedConsumer cons = pair . Value ;
@@ -903,8 +919,7 @@ protected void RecoverQueues()
903
919
{
904
920
lock ( m_recordedQueues )
905
921
{
906
- var rqs = new Dictionary < string , RecordedQueue > ( m_recordedQueues ) ;
907
- foreach ( KeyValuePair < string , RecordedQueue > pair in rqs )
922
+ foreach ( KeyValuePair < string , RecordedQueue > pair in m_recordedQueues )
908
923
{
909
924
string oldName = pair . Key ;
910
925
RecordedQueue rq = pair . Value ;
0 commit comments