@@ -95,7 +95,7 @@ private async Task ReconnectConsumers()
9595 }
9696 }
9797
98- private readonly ConnectionSettings _connectionSettings ;
98+ private readonly IConnectionSettings _connectionSettings ;
9999 internal readonly AmqpSessionManagement _nativePubSubSessions ;
100100
101101 // TODO: Implement the semaphore to avoid multiple connections
@@ -116,6 +116,13 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
116116 return Publishers . Values . ToList ( ) . AsReadOnly ( ) ;
117117 }
118118
119+ public ReadOnlyCollection < IConsumer > GetConsumers ( )
120+ {
121+ return Consumers . Values . ToList ( ) . AsReadOnly ( ) ;
122+ }
123+
124+ public long Id { get ; set ; }
125+
119126 /// <summary>
120127 /// Creates a new instance of <see cref="AmqpConnection"/>
121128 /// Through the Connection is possible to create:
@@ -124,7 +131,7 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
124131 /// </summary>
125132 /// <param name="connectionSettings"></param>
126133 /// <returns></returns>
127- public static async Task < IConnection > CreateAsync ( ConnectionSettings connectionSettings )
134+ public static async Task < IConnection > CreateAsync ( IConnectionSettings connectionSettings )
128135 {
129136 var connection = new AmqpConnection ( connectionSettings ) ;
130137 await connection . OpenAsync ( )
@@ -158,7 +165,7 @@ await consumer.CloseAsync()
158165 }
159166 }
160167
161- private AmqpConnection ( ConnectionSettings connectionSettings )
168+ private AmqpConnection ( IConnectionSettings connectionSettings )
162169 {
163170 _connectionSettings = connectionSettings ;
164171 _nativePubSubSessions = new AmqpSessionManagement ( this , 1 ) ;
@@ -198,10 +205,7 @@ private async Task EnsureConnection()
198205 var open = new Open
199206 {
200207 HostName = $ "vhost:{ _connectionSettings . VirtualHost } ",
201- Properties = new Fields ( )
202- {
203- [ new Symbol ( "connection_name" ) ] = _connectionSettings . ConnectionName ,
204- }
208+ Properties = new Fields ( ) { [ new Symbol ( "connection_name" ) ] = _connectionSettings . ConnectionName , }
205209 } ;
206210
207211 if ( _connectionSettings . MaxFrameSize > uint . MinValue )
@@ -230,12 +234,14 @@ void onOpened(Amqp.IConnection connection, Open open1)
230234
231235 if ( _connectionSettings . TlsSettings . LocalCertificateSelectionCallback is not null )
232236 {
233- cf . SSL . LocalCertificateSelectionCallback = _connectionSettings . TlsSettings . LocalCertificateSelectionCallback ;
237+ cf . SSL . LocalCertificateSelectionCallback =
238+ _connectionSettings . TlsSettings . LocalCertificateSelectionCallback ;
234239 }
235240
236241 if ( _connectionSettings . TlsSettings . RemoteCertificateValidationCallback is not null )
237242 {
238- cf . SSL . RemoteCertificateValidationCallback = _connectionSettings . TlsSettings . RemoteCertificateValidationCallback ;
243+ cf . SSL . RemoteCertificateValidationCallback =
244+ _connectionSettings . TlsSettings . RemoteCertificateValidationCallback ;
239245 }
240246 }
241247
@@ -246,7 +252,7 @@ void onOpened(Amqp.IConnection connection, Open open1)
246252
247253 try
248254 {
249- _nativeConnection = await cf . CreateAsync ( _connectionSettings . Address , open : open , onOpened : onOpened )
255+ _nativeConnection = await cf . CreateAsync ( ( _connectionSettings as ConnectionSettings ) ? . Address , open : open , onOpened : onOpened )
250256 . ConfigureAwait ( false ) ;
251257 }
252258 catch ( Exception ex )
@@ -300,7 +306,7 @@ private ClosedCallback MaybeRecoverConnection()
300306 // we have to check if the recovery is active.
301307 // The user may want to disable the recovery mechanism
302308 // the user can use the lifecycle callback to handle the error
303- if ( ! _connectionSettings . RecoveryConfiguration . IsActivate ( ) )
309+ if ( ! _connectionSettings . Recovery . IsActivate ( ) )
304310 {
305311 OnNewStatus ( State . Closed , Utils . ConvertError ( error ) ) ;
306312 ChangeEntitiesStatus ( State . Closed , Utils . ConvertError ( error ) ) ;
@@ -323,19 +329,19 @@ await Task.Run(async () =>
323329 // the user may want to disable the backoff policy or
324330 // the backoff policy is not active due of some condition
325331 // for example: Reaching the maximum number of retries and avoid the forever loop
326- _connectionSettings . RecoveryConfiguration . GetBackOffDelayPolicy ( ) . IsActive ( ) &&
332+ _connectionSettings . Recovery . GetBackOffDelayPolicy ( ) . IsActive ( ) &&
327333
328334 // even we set the status to reconnecting up, we need to check if the connection is still in the
329335 // reconnecting status. The user may close the connection in the meanwhile
330336 State == State . Reconnecting )
331337 {
332338 try
333339 {
334- int next = _connectionSettings . RecoveryConfiguration . GetBackOffDelayPolicy ( ) . Delay ( ) ;
340+ int next = _connectionSettings . Recovery . GetBackOffDelayPolicy ( ) . Delay ( ) ;
335341
336342 Trace . WriteLine ( TraceLevel . Information ,
337343 $ "Trying Recovering connection in { next } milliseconds, " +
338- $ "attempt: { _connectionSettings . RecoveryConfiguration . GetBackOffDelayPolicy ( ) . CurrentAttempt } . " +
344+ $ "attempt: { _connectionSettings . Recovery . GetBackOffDelayPolicy ( ) . CurrentAttempt } . " +
339345 $ "Info: { ToString ( ) } )") ;
340346
341347 await Task . Delay ( TimeSpan . FromMilliseconds ( next ) )
@@ -352,7 +358,7 @@ await EnsureConnection()
352358 }
353359 }
354360
355- _connectionSettings . RecoveryConfiguration . GetBackOffDelayPolicy ( ) . Reset ( ) ;
361+ _connectionSettings . Recovery . GetBackOffDelayPolicy ( ) . Reset ( ) ;
356362 string connectionDescription = connected ? "recovered" : "not recovered" ;
357363 Trace . WriteLine ( TraceLevel . Information ,
358364 $ "Connection { connectionDescription } . Info: { ToString ( ) } ") ;
@@ -362,15 +368,15 @@ await EnsureConnection()
362368 Trace . WriteLine ( TraceLevel . Verbose , $ "connection is closed. Info: { ToString ( ) } ") ;
363369 OnNewStatus ( State . Closed ,
364370 new Error ( ConnectionNotRecoveredCode ,
365- $ "{ ConnectionNotRecoveredMessage } , recover status: { _connectionSettings . RecoveryConfiguration } ") ) ;
371+ $ "{ ConnectionNotRecoveredMessage } , recover status: { _connectionSettings . Recovery } ") ) ;
366372
367373 ChangeEntitiesStatus ( State . Closed , new Error ( ConnectionNotRecoveredCode ,
368- $ "{ ConnectionNotRecoveredMessage } , recover status: { _connectionSettings . RecoveryConfiguration } ") ) ;
374+ $ "{ ConnectionNotRecoveredMessage } , recover status: { _connectionSettings . Recovery } ") ) ;
369375
370376 return ;
371377 }
372378
373- if ( _connectionSettings . RecoveryConfiguration . IsTopologyActive ( ) )
379+ if ( _connectionSettings . Recovery . IsTopologyActive ( ) )
374380 {
375381 Trace . WriteLine ( TraceLevel . Information , $ "Recovering topology. Info: { ToString ( ) } ") ;
376382 var visitor = new Visitor ( _management ) ;
0 commit comments