77
88namespace RabbitMQ . AMQP . Client . Impl ;
99
10- internal class Visitor ( AmqpManagement management ) : IVisitor
11- {
12- private AmqpManagement Management { get ; } = management ;
13-
14- public async Task VisitQueues ( IEnumerable < QueueSpec > queueSpec )
15- {
16- foreach ( QueueSpec spec in queueSpec )
17- {
18- Trace . WriteLine ( TraceLevel . Information , $ "Recovering queue { spec . Name } ") ;
19- try
20- {
21- await Management . Queue ( spec ) . Declare ( )
22- . ConfigureAwait ( false ) ;
23- }
24- catch ( Exception e )
25- {
26- Trace . WriteLine ( TraceLevel . Error ,
27- $ "Error recovering queue { spec . Name } . Error: { e } . Management Status: { Management } ") ;
28- }
29- }
30- }
31- }
32-
3310/// <summary>
3411/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
3512/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
@@ -38,7 +15,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3815{
3916 private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED" ;
4017 private const string ConnectionNotRecoveredMessage = "Connection not recovered" ;
41- private readonly SemaphoreSlim _semaphoreClose = new ( 1 , 1 ) ;
18+ private readonly SemaphoreSlim _semaphoreClose = new ( 1 , 1 ) ; // TODO this needs to be Disposed
4219
4320 // The native AMQP.Net Lite connection
4421 private Connection ? _nativeConnection ;
@@ -55,7 +32,7 @@ private void ChangeEntitiesStatus(State state, Error? error)
5532
5633 private void ChangePublishersStatus ( State state , Error ? error )
5734 {
58- foreach ( var publisher1 in Publishers . Values )
35+ foreach ( IPublisher publisher1 in Publishers . Values )
5936 {
6037 var publisher = ( AmqpPublisher ) publisher1 ;
6138 publisher . ChangeStatus ( state , error ) ;
@@ -64,7 +41,7 @@ private void ChangePublishersStatus(State state, Error? error)
6441
6542 private void ChangeConsumersStatus ( State state , Error ? error )
6643 {
67- foreach ( var consumer1 in Consumers . Values )
44+ foreach ( IConsumer consumer1 in Consumers . Values )
6845 {
6946 var consumer = ( AmqpConsumer ) consumer1 ;
7047 consumer . ChangeStatus ( state , error ) ;
@@ -79,7 +56,7 @@ private async Task ReconnectEntities()
7956
8057 private async Task ReconnectPublishers ( )
8158 {
82- foreach ( var publisher1 in Publishers . Values )
59+ foreach ( IPublisher publisher1 in Publishers . Values )
8360 {
8461 var publisher = ( AmqpPublisher ) publisher1 ;
8562 await publisher . Reconnect ( ) . ConfigureAwait ( false ) ;
@@ -88,7 +65,7 @@ private async Task ReconnectPublishers()
8865
8966 private async Task ReconnectConsumers ( )
9067 {
91- foreach ( var consumer1 in Consumers . Values )
68+ foreach ( IConsumer consumer1 in Consumers . Values )
9269 {
9370 var consumer = ( AmqpConsumer ) consumer1 ;
9471 await consumer . Reconnect ( ) . ConfigureAwait ( false ) ;
@@ -419,7 +396,6 @@ public IPublisherBuilder PublisherBuilder()
419396 return publisherBuilder ;
420397 }
421398
422-
423399 public override async Task CloseAsync ( )
424400 {
425401 await _semaphoreClose . WaitAsync ( )
@@ -459,10 +435,32 @@ await ConnectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10
459435 OnNewStatus ( State . Closed , null ) ;
460436 }
461437
462-
463438 public override string ToString ( )
464439 {
465440 string info = $ "AmqpConnection{{ConnectionSettings='{ _connectionSettings } ', Status='{ State . ToString ( ) } '}}";
466441 return info ;
467442 }
468443}
444+
445+ internal class Visitor ( AmqpManagement management ) : IVisitor
446+ {
447+ private AmqpManagement Management { get ; } = management ;
448+
449+ public async Task VisitQueues ( IEnumerable < QueueSpec > queueSpec )
450+ {
451+ foreach ( QueueSpec spec in queueSpec )
452+ {
453+ Trace . WriteLine ( TraceLevel . Information , $ "Recovering queue { spec . Name } ") ;
454+ try
455+ {
456+ await Management . Queue ( spec ) . Declare ( )
457+ . ConfigureAwait ( false ) ;
458+ }
459+ catch ( Exception e )
460+ {
461+ Trace . WriteLine ( TraceLevel . Error ,
462+ $ "Error recovering queue { spec . Name } . Error: { e } . Management Status: { Management } ") ;
463+ }
464+ }
465+ }
466+ }
0 commit comments