99namespace RabbitMQ . Client . Core . DependencyInjection
1010{
1111 /// <summary>
12- /// An implementation of the service that handles message consumption events.
12+ /// An implementation of the service that handles message receiving ( consumption) events.
1313 /// </summary>
1414 public class MessageHandlingService : IMessageHandlingService
1515 {
1616 const int ResendTimeout = 60 ;
1717
1818 readonly IEnumerable < RabbitMqExchange > _exchanges ;
19- readonly IDictionary < Type , List < string > > _messageHandlerRouters ;
2019 readonly IDictionary < string , IList < IMessageHandler > > _messageHandlers ;
2120 readonly IDictionary < string , IList < IAsyncMessageHandler > > _asyncMessageHandlers ;
2221 readonly IDictionary < string , IList < INonCyclicMessageHandler > > _nonCyclicHandlers ;
@@ -33,115 +32,137 @@ public MessageHandlingService(
3332 ILogger < MessageHandlingService > logger )
3433 {
3534 _exchanges = exchanges ;
36- _messageHandlerRouters = TransformMessageHandlerRouters ( routers ) ;
37- _messageHandlers = TransformMessageHandlersCollection ( messageHandlers ) ;
38- _asyncMessageHandlers = TransformMessageHandlersCollection ( asyncMessageHandlers ) ;
39- _nonCyclicHandlers = TransformMessageHandlersCollection ( nonCyclicHandlers ) ;
40- _asyncNonCyclicHandlers = TransformMessageHandlersCollection ( asyncNonCyclicHandlers ) ;
35+ var messageHandlerRouters = TransformMessageHandlerRouters ( routers ) ;
36+ _messageHandlers = TransformMessageHandlersCollection ( messageHandlers , messageHandlerRouters ) ;
37+ _asyncMessageHandlers = TransformMessageHandlersCollection ( asyncMessageHandlers , messageHandlerRouters ) ;
38+ _nonCyclicHandlers = TransformMessageHandlersCollection ( nonCyclicHandlers , messageHandlerRouters ) ;
39+ _asyncNonCyclicHandlers = TransformMessageHandlersCollection ( asyncNonCyclicHandlers , messageHandlerRouters ) ;
4140 _logger = logger ;
4241 }
4342
4443 /// <summary>
45- /// Handle message consumption event.
44+ /// Handle message receiving event.
4645 /// </summary>
47- /// <param name="eventArgs">Arguments of message consumption event.</param>
46+ /// <param name="eventArgs">Arguments of message receiving event.</param>
4847 /// <param name="queueService">An instance of the queue service <see cref="IQueueService"/>.</param>
49- public void HandleMessage ( BasicDeliverEventArgs eventArgs , IQueueService queueService )
48+ public void HandleMessageReceivingEvent ( BasicDeliverEventArgs eventArgs , IQueueService queueService )
5049 {
5150 var message = Encoding . UTF8 . GetString ( eventArgs . Body ) ;
5251
53- _logger . LogInformation ( $ "New message was received with deliveryTag { eventArgs . DeliveryTag } .") ;
54- _logger . LogInformation ( message ) ;
52+ _logger . LogInformation ( $ "A new message was received with deliveryTag { eventArgs . DeliveryTag } .") ;
53+ _logger . LogInformation ( message ) ;
5554
56- try
55+ try
56+ {
57+ if ( _asyncMessageHandlers . ContainsKey ( eventArgs . RoutingKey ) )
5758 {
58- if ( _asyncMessageHandlers . ContainsKey ( eventArgs . RoutingKey ) )
59+ var tasks = new List < Task > ( ) ;
60+ foreach ( var handler in _asyncMessageHandlers [ eventArgs . RoutingKey ] )
5961 {
60- var tasks = new List < Task > ( ) ;
61- foreach ( var handler in _asyncMessageHandlers [ eventArgs . RoutingKey ] )
62- {
63- tasks . Add ( RunAsyncHandler ( handler , message , eventArgs . RoutingKey ) ) ;
64- }
65- Task . WaitAll ( tasks . ToArray ( ) ) ;
62+ tasks . Add ( RunAsyncMessageHandler ( handler , message , eventArgs . RoutingKey ) ) ;
6663 }
67- if ( _messageHandlers . ContainsKey ( eventArgs . RoutingKey ) )
64+ Task . WaitAll ( tasks . ToArray ( ) ) ;
65+ }
66+ if ( _messageHandlers . ContainsKey ( eventArgs . RoutingKey ) )
67+ {
68+ foreach ( var handler in _messageHandlers [ eventArgs . RoutingKey ] )
6869 {
69- foreach ( var handler in _messageHandlers [ eventArgs . RoutingKey ] )
70- {
71- _logger . LogDebug ( $ "Starting processing the message by message handler { handler ? . GetType ( ) . Name } .") ;
72- handler . Handle ( message , eventArgs . RoutingKey ) ;
73- _logger . LogDebug ( $ "The message has been processed by message handler { handler ? . GetType ( ) . Name } .") ;
74- }
70+ RunMessageHandler ( handler , message , eventArgs . RoutingKey ) ;
7571 }
76- if ( _asyncNonCyclicHandlers . ContainsKey ( eventArgs . RoutingKey ) )
72+ }
73+ if ( _asyncNonCyclicHandlers . ContainsKey ( eventArgs . RoutingKey ) )
74+ {
75+ var tasks = new List < Task > ( ) ;
76+ foreach ( var handler in _asyncNonCyclicHandlers [ eventArgs . RoutingKey ] )
7777 {
78- var tasks = new List < Task > ( ) ;
79- foreach ( var handler in _asyncNonCyclicHandlers [ eventArgs . RoutingKey ] )
80- {
81- tasks . Add ( RunAsyncNonCyclicHandler ( handler , message , eventArgs . RoutingKey , queueService ) ) ;
82- }
83- Task . WaitAll ( tasks . ToArray ( ) ) ;
78+ tasks . Add ( RunAsyncNonCyclicMessageHandler ( handler , message , eventArgs . RoutingKey , queueService ) ) ;
8479 }
85- if ( _nonCyclicHandlers . ContainsKey ( eventArgs . RoutingKey ) )
80+ Task . WaitAll ( tasks . ToArray ( ) ) ;
81+ }
82+ if ( _nonCyclicHandlers . ContainsKey ( eventArgs . RoutingKey ) )
83+ {
84+ foreach ( var handler in _nonCyclicHandlers [ eventArgs . RoutingKey ] )
8685 {
87- foreach ( var handler in _nonCyclicHandlers [ eventArgs . RoutingKey ] )
88- {
89- _logger . LogDebug ( $ "Starting processing the message by non-cyclic message handler { handler ? . GetType ( ) . Name } .") ;
90- handler . Handle ( message , eventArgs . RoutingKey , queueService ) ;
91- _logger . LogDebug ( $ "The message has been processed by non-cyclic message handler { handler ? . GetType ( ) . Name } .") ;
92- }
86+ RunNonCyclicMessageHandler ( handler , message , eventArgs . RoutingKey , queueService ) ;
9387 }
94- _logger . LogInformation ( $ "Success message with deliveryTag { eventArgs . DeliveryTag } .") ;
95- queueService . Channel . BasicAck ( eventArgs . DeliveryTag , false ) ;
9688 }
97- catch ( Exception exception )
98- {
99- _logger . LogError ( new EventId ( ) , exception , $ "An error occurred while processing received message with delivery tag { eventArgs . DeliveryTag } .") ;
89+ queueService . Channel . BasicAck ( eventArgs . DeliveryTag , false ) ;
90+ _logger . LogInformation ( $ "Message processing finished successfully. Acknowledge has been sent with deliveryTag { eventArgs . DeliveryTag } .") ;
91+ }
92+ catch ( Exception exception )
93+ {
94+ _logger . LogError ( new EventId ( ) , exception , $ "An error occurred while processing received message with the delivery tag { eventArgs . DeliveryTag } .") ;
10095
101- queueService . Channel . BasicAck ( eventArgs . DeliveryTag , false ) ;
96+ queueService . Channel . BasicAck ( eventArgs . DeliveryTag , false ) ;
10297
103- if ( eventArgs . BasicProperties . Headers is null )
104- {
105- eventArgs . BasicProperties . Headers = new Dictionary < string , object > ( ) ;
106- }
98+ if ( eventArgs . BasicProperties . Headers is null )
99+ {
100+ eventArgs . BasicProperties . Headers = new Dictionary < string , object > ( ) ;
101+ }
107102
108- var exchange = _exchanges . FirstOrDefault ( x => x . Name == eventArgs . Exchange ) ;
109- if ( exchange is null )
110- {
111- _logger . LogError ( $ "Could not detect exchange { eventArgs . Exchange } to detect the necessity of resending the failed message.") ;
112- return ;
113- }
103+ var exchange = _exchanges . FirstOrDefault ( x => x . Name == eventArgs . Exchange ) ;
104+ if ( exchange is null )
105+ {
106+ _logger . LogError ( $ "Could not detect an exchange \" { eventArgs . Exchange } \" to determine the necessity of resending the failed message.") ;
107+ return ;
108+ }
114109
115- if ( exchange . Options . RequeueFailedMessages
116- && ! string . IsNullOrEmpty ( exchange . Options . DeadLetterExchange )
117- && ! eventArgs . BasicProperties . Headers . ContainsKey ( "requeued" ) )
118- {
119- eventArgs . BasicProperties . Headers . Add ( "requeued" , true ) ;
120- queueService . Send ( eventArgs . Body , eventArgs . BasicProperties , eventArgs . Exchange , eventArgs . RoutingKey , ResendTimeout ) ;
121- _logger . LogInformation ( "The failed message has been requeued." ) ;
122- }
123- else
124- {
125- _logger . LogInformation ( "The failed message would not be requeued." ) ;
126- }
110+ if ( exchange . Options . RequeueFailedMessages
111+ && ! string . IsNullOrEmpty ( exchange . Options . DeadLetterExchange )
112+ && ! eventArgs . BasicProperties . Headers . ContainsKey ( "requeued" ) )
113+ {
114+ eventArgs . BasicProperties . Headers . Add ( "requeued" , true ) ;
115+ queueService . Send ( eventArgs . Body , eventArgs . BasicProperties , eventArgs . Exchange , eventArgs . RoutingKey , ResendTimeout ) ;
116+ _logger . LogInformation ( "The failed message has been requeued." ) ;
117+ }
118+ else
119+ {
120+ _logger . LogInformation ( "The failed message would not be requeued." ) ;
127121 }
122+ }
123+ }
124+
125+ void RunMessageHandler ( IMessageHandler handler , string message , string routingKey )
126+ {
127+ ValidateHandler ( handler ) ;
128+ _logger . LogDebug ( $ "Starting processing the message by message handler { handler . GetType ( ) . Name } .") ;
129+ handler . Handle ( message , routingKey ) ;
130+ _logger . LogDebug ( $ "The message has been processed by message handler { handler . GetType ( ) . Name } .") ;
131+ }
132+
133+ void RunNonCyclicMessageHandler ( INonCyclicMessageHandler handler , string message , string routingKey , IQueueService queueService )
134+ {
135+ ValidateHandler ( handler ) ;
136+ _logger . LogDebug ( $ "Starting processing the message by non-cyclic message handler { handler . GetType ( ) . Name } .") ;
137+ handler ? . Handle ( message , routingKey , queueService ) ;
138+ _logger . LogDebug ( $ "The message has been processed by non-cyclic message handler { handler . GetType ( ) . Name } .") ;
128139 }
129140
130- async Task RunAsyncHandler ( IAsyncMessageHandler handler , string message , string routingKey )
141+ async Task RunAsyncMessageHandler ( IAsyncMessageHandler handler , string message , string routingKey )
131142 {
132- _logger . LogDebug ( $ "Starting processing the message by async message handler { handler ? . GetType ( ) . Name } .") ;
143+ ValidateHandler ( handler ) ;
144+ _logger . LogDebug ( $ "Starting processing the message by async message handler { handler . GetType ( ) . Name } .") ;
133145 await handler . Handle ( message , routingKey ) ;
134- _logger . LogDebug ( $ "The message has been processed by async message handler { handler ? . GetType ( ) . Name } .") ;
146+ _logger . LogDebug ( $ "The message has been processed by async message handler { handler . GetType ( ) . Name } .") ;
135147 }
136148
137- async Task RunAsyncNonCyclicHandler ( IAsyncNonCyclicMessageHandler handler , string message , string routingKey , IQueueService queueService )
149+ async Task RunAsyncNonCyclicMessageHandler ( IAsyncNonCyclicMessageHandler handler , string message , string routingKey , IQueueService queueService )
138150 {
139- _logger . LogDebug ( $ "Starting processing the message by async non-cyclic message handler { handler ? . GetType ( ) . Name } .") ;
151+ ValidateHandler ( handler ) ;
152+ _logger . LogDebug ( $ "Starting processing the message by async non-cyclic message handler { handler . GetType ( ) . Name } .") ;
140153 await handler . Handle ( message , routingKey , queueService ) ;
141- _logger . LogDebug ( $ "The message has been processed by async non-cyclic message handler { handler ? . GetType ( ) . Name } .") ;
154+ _logger . LogDebug ( $ "The message has been processed by async non-cyclic message handler { handler . GetType ( ) . Name } .") ;
155+ }
156+
157+ void ValidateHandler < T > ( T messageHandler )
158+ {
159+ if ( messageHandler is null )
160+ {
161+ throw new ArgumentNullException ( nameof ( messageHandler ) , "Message handler is null." ) ;
162+ }
142163 }
143164
144- IDictionary < Type , List < string > > TransformMessageHandlerRouters ( IEnumerable < MessageHandlerRouter > routers )
165+ static IDictionary < Type , List < string > > TransformMessageHandlerRouters ( IEnumerable < MessageHandlerRouter > routers )
145166 {
146167 var dictionary = new Dictionary < Type , List < string > > ( ) ;
147168 foreach ( var router in routers )
@@ -157,14 +178,14 @@ IDictionary<Type, List<string>> TransformMessageHandlerRouters(IEnumerable<Messa
157178 }
158179 return dictionary ;
159180 }
160-
161- IDictionary < string , IList < T > > TransformMessageHandlersCollection < T > ( IEnumerable < T > messageHandlers )
181+
182+ static IDictionary < string , IList < T > > TransformMessageHandlersCollection < T > ( IEnumerable < T > messageHandlers , IDictionary < Type , List < string > > messageHandlerRouters )
162183 {
163184 var dictionary = new Dictionary < string , IList < T > > ( ) ;
164185 foreach ( var handler in messageHandlers )
165186 {
166187 var type = handler . GetType ( ) ;
167- foreach ( var routingKey in _messageHandlerRouters [ type ] )
188+ foreach ( var routingKey in messageHandlerRouters [ type ] )
168189 {
169190 if ( dictionary . ContainsKey ( routingKey ) )
170191 {
0 commit comments