@@ -15,7 +15,7 @@ namespace RabbitMQ.Client.Core.DependencyInjection
1515 /// <summary>
1616 /// Implementation of the custom RabbitMQ queue service.
1717 /// </summary>
18- internal class QueueService : IQueueService
18+ internal class QueueService : IQueueService , IDisposable
1919 {
2020 /// <summary>
2121 /// RabbitMQ connection.
@@ -56,7 +56,9 @@ public QueueService(
5656 IOptions < RabbitMqClientOptions > options )
5757 {
5858 if ( options is null )
59+ {
5960 throw new ArgumentException ( $ "Argument { nameof ( options ) } is null.", nameof ( options ) ) ;
61+ }
6062
6163 _exchanges = exchanges ;
6264
@@ -103,17 +105,22 @@ public void Dispose()
103105 _connection . CallbackException -= HandleConnectionCallbackException ;
104106 _connection . ConnectionRecoveryError -= HandleConnectionRecoveryError ;
105107 }
108+
106109 if ( _channel != null )
107110 {
108111 _channel . CallbackException -= HandleChannelCallbackException ;
109112 _channel . BasicRecoverOk -= HandleChannelBasicRecoverOk ;
110113 }
111114
112115 if ( _channel ? . IsOpen == true )
116+ {
113117 _channel . Close ( ( int ) HttpStatusCode . OK , "Channel closed" ) ;
118+ }
114119
115120 if ( _connection ? . IsOpen == true )
121+ {
116122 _connection . Close ( ) ;
123+ }
117124 }
118125
119126 /// <summary>
@@ -122,7 +129,9 @@ public void Dispose()
122129 public void StartConsuming ( )
123130 {
124131 if ( _consumingStarted )
132+ {
125133 return ;
134+ }
126135
127136 _consumer . Received += _receivedMessage ;
128137 _consumingStarted = true ;
@@ -246,7 +255,9 @@ IBasicProperties CreateJsonProperties()
246255 void HandleConnectionCallbackException ( object sender , CallbackExceptionEventArgs @event )
247256 {
248257 if ( @event is null )
258+ {
249259 return ;
260+ }
250261
251262 _logger . LogError ( new EventId ( ) , @event . Exception , @event . Exception . Message , @event ) ;
252263 throw @event . Exception ;
@@ -255,7 +266,9 @@ void HandleConnectionCallbackException(object sender, CallbackExceptionEventArgs
255266 void HandleConnectionRecoveryError ( object sender , ConnectionRecoveryErrorEventArgs @event )
256267 {
257268 if ( @event is null )
269+ {
258270 return ;
271+ }
259272
260273 _logger . LogError ( new EventId ( ) , @event . Exception , @event . Exception . Message , @event ) ;
261274 throw @event . Exception ;
@@ -264,15 +277,19 @@ void HandleConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventAr
264277 void HandleChannelBasicRecoverOk ( object sender , EventArgs @event )
265278 {
266279 if ( @event is null )
280+ {
267281 return ;
282+ }
268283
269284 _logger . LogInformation ( "Connection has been reestablished." ) ;
270285 }
271286
272287 void HandleChannelCallbackException ( object sender , CallbackExceptionEventArgs @event )
273288 {
274289 if ( @event is null )
290+ {
275291 return ;
292+ }
276293
277294 _logger . LogError ( new EventId ( ) , @event . Exception , @event . Exception . Message , @event ) ;
278295 }
@@ -311,7 +328,7 @@ IDictionary<string, IList<T>> TransformMessageHandlersCollection<T>(IEnumerable<
311328 }
312329 else
313330 {
314- dictionary . Add ( routingKey , new List < T > ( ) { handler } ) ;
331+ dictionary . Add ( routingKey , new List < T > { handler } ) ;
315332 }
316333 }
317334 }
@@ -375,7 +392,9 @@ void StartClient()
375392 Channel . BasicAck ( @event . DeliveryTag , false ) ;
376393
377394 if ( @event . BasicProperties . Headers is null )
395+ {
378396 @event . BasicProperties . Headers = new Dictionary < string , object > ( ) ;
397+ }
379398
380399 var exchange = _exchanges . FirstOrDefault ( x => x . Name == @event . Exchange ) ;
381400 if ( exchange is null )
@@ -393,7 +412,9 @@ void StartClient()
393412 _logger . LogInformation ( "The failed message has been requeued." ) ;
394413 }
395414 else
415+ {
396416 _logger . LogInformation ( "The failed message would not be requeued." ) ;
417+ }
397418 }
398419 } ;
399420
@@ -462,10 +483,12 @@ void StartQueue(RabbitMqQueueOptions queue, string exchangeName)
462483 {
463484 // If there are not any routing keys then make a bind with a queue name.
464485 foreach ( var route in queue . RoutingKeys )
486+ {
465487 _channel . QueueBind (
466488 queue : queue . Name ,
467489 exchange : exchangeName ,
468490 routingKey : route ) ;
491+ }
469492 }
470493 else
471494 {
@@ -479,20 +502,28 @@ void StartQueue(RabbitMqQueueOptions queue, string exchangeName)
479502 void ValidateArguments ( string exchangeName , string routingKey )
480503 {
481504 if ( string . IsNullOrEmpty ( exchangeName ) )
505+ {
482506 throw new ArgumentException ( $ "Argument { nameof ( exchangeName ) } is null or empty.", nameof ( exchangeName ) ) ;
507+ }
483508 if ( string . IsNullOrEmpty ( routingKey ) )
509+ {
484510 throw new ArgumentException ( $ "Argument { nameof ( routingKey ) } is null or empty.", nameof ( routingKey ) ) ;
511+ }
485512
486513 var deadLetterExchanges = _exchanges . Select ( x => x . Options . DeadLetterExchange ) . Distinct ( ) ;
487514 if ( ! _exchanges . Any ( x => x . Name == exchangeName ) && ! deadLetterExchanges . Any ( x => x == exchangeName ) )
515+ {
488516 throw new ArgumentException ( $ "Exchange { nameof ( exchangeName ) } has not been deaclared yet.", nameof ( exchangeName ) ) ;
517+ }
489518 }
490519
491520 string GetDeadLetterExchange ( string exchangeName )
492521 {
493522 var exchange = _exchanges . FirstOrDefault ( x => x . Name == exchangeName ) ;
494523 if ( string . IsNullOrEmpty ( exchange . Options . DeadLetterExchange ) )
524+ {
495525 throw new ArgumentException ( $ "Exchange { nameof ( exchangeName ) } has not been configured with a dead letter exchange.", nameof ( exchangeName ) ) ;
526+ }
496527
497528 return exchange . Options . DeadLetterExchange ;
498529 }
0 commit comments