Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit b25c727

Browse files
Added non-cycling message handlers.
1 parent b106498 commit b25c727

File tree

6 files changed

+245
-27
lines changed

6 files changed

+245
-27
lines changed

README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,55 @@ public class CustomAsyncMessageHandler : IAsyncMessageHandler
152152
}
153153
```
154154

155+
But you can not use `IQueueService` inside those message handlers otherwise you will be faced with cycling dependency problem. But sometimes you may need to send something in other queue (e.g. queue with responses) from one message handler or another. For that purposes use non-cyclinc handlers.
156+
157+
```csharp
158+
public class CustomMessageHandler : INonCyclicMessageHandler
159+
{
160+
readonly ILogger<CustomMessageHandler> _logger;
161+
public CustomMessageHandler(ILogger<CustomMessageHandler> logger)
162+
{
163+
_logger = logger;
164+
}
165+
166+
public void Handle(string message, string routingKey, IQueueService queueService)
167+
{
168+
_logger.LogInformation("Got a message.");
169+
var response = new { Message = message };
170+
queueService.Send(response, "exchange.name", "routing.key");
171+
}
172+
}
173+
```
174+
175+
Or the same but async.
176+
177+
```csharp
178+
public class CustomAsyncMessageHandler : IAsyncNonCyclicMessageHandler
179+
{
180+
readonly ILogger<CustomAsyncMessageHandler> _logger;
181+
182+
public CustomAsyncMessageHandler(ILogger<CustomAsyncMessageHandler> logger)
183+
{
184+
_logger = logger;
185+
}
186+
187+
public async Task Handle(string message, string routingKey, IQueueService queueService)
188+
{
189+
_logger.LogInformation("Doing something async.");
190+
var response = new { Message = message };
191+
await queueService.SendAsync(response, "exchange.name", "routing.key");
192+
}
193+
}
194+
```
195+
196+
And you have to register those classes the same way you did with simple handlers.
197+
```csharp
198+
services.AddRabbitMqClient(rabbitMqSection)
199+
.AddExchange("exchange.name", exchangeSection)
200+
.AddNonCyclicMessageHandlerSingleton<CustomMessageHandler>("routing.key")
201+
.AddAsyncNonCyclicMessageHandlerSingleton<CustomAsyncMessageHandler>("other.routing.key");
202+
```
203+
155204
You can find example projects in the repository too.
156205

157206
### appsettings.json configuration
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Threading.Tasks;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection
4+
{
5+
/// <summary>
6+
/// Interface of a non-cycling async message handler.
7+
/// </summary>
8+
public interface IAsyncNonCyclicMessageHandler
9+
{
10+
/// <summary>
11+
/// Handle message from a queue.
12+
/// </summary>
13+
/// <param name="message">Json message.</param>
14+
/// <param name="routingKey">Routing key.</param>
15+
Task Handle(string message, string routingKey, IQueueService queueService);
16+
}
17+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
namespace RabbitMQ.Client.Core.DependencyInjection
2+
{
3+
/// <summary>
4+
/// Interface of a non-cycling message handler.
5+
/// </summary>
6+
public interface INonCyclicMessageHandler
7+
{
8+
/// <summary>
9+
/// Handle message from a queue.
10+
/// </summary>
11+
/// <param name="message">Json message.</param>
12+
/// <param name="routingKey">Routing key.</param>
13+
/// <param name="queueService">Queue service.</param>
14+
void Handle(string message, string routingKey, IQueueService queueService);
15+
}
16+
}

src/RabbitMQ.Client.Core.DependencyInjection/QueueService.cs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ internal class QueueService : IQueueService
3232

3333
readonly IDictionary<string, IList<IMessageHandler>> _messageHandlers;
3434
readonly IDictionary<string, IList<IAsyncMessageHandler>> _asyncMessageHandlers;
35+
readonly IDictionary<string, IList<INonCyclicMessageHandler>> _nonCyclicHandlers;
36+
readonly IDictionary<string, IList<IAsyncNonCyclicMessageHandler>> _asyncNonCyclicHandlers;
3537
readonly IDictionary<Type, List<string>> _routingKeys;
3638
readonly IEnumerable<RabbitMqExchange> _exchanges;
3739
readonly ILogger<QueueService> _logger;
@@ -46,6 +48,8 @@ internal class QueueService : IQueueService
4648
public QueueService(
4749
IEnumerable<IMessageHandler> messageHandlers,
4850
IEnumerable<IAsyncMessageHandler> asyncMessageHandlers,
51+
IEnumerable<INonCyclicMessageHandler> nonCyclicHandlers,
52+
IEnumerable<IAsyncNonCyclicMessageHandler> asyncnonCyclicHandlers,
4953
IEnumerable<RabbitMqExchange> exchanges,
5054
IEnumerable<MessageHandlerRouter> routers,
5155
ILoggerFactory loggerFactory,
@@ -58,7 +62,9 @@ public QueueService(
5862

5963
_routingKeys = TransformMessageHandlerRouters(routers);
6064
_messageHandlers = TransformMessageHandlersCollection(messageHandlers);
61-
_asyncMessageHandlers = TransformAsyncMessageHandlersCollection(asyncMessageHandlers);
65+
_asyncMessageHandlers = TransformMessageHandlersCollection(asyncMessageHandlers);
66+
_nonCyclicHandlers = TransformMessageHandlersCollection(nonCyclicHandlers);
67+
_asyncNonCyclicHandlers = TransformMessageHandlersCollection(asyncnonCyclicHandlers);
6268

6369
_logger = loggerFactory.CreateLogger<QueueService>();
6470

@@ -278,9 +284,9 @@ IDictionary<Type, List<string>> TransformMessageHandlerRouters(IEnumerable<Messa
278284
return dictionary;
279285
}
280286

281-
IDictionary<string, IList<IMessageHandler>> TransformMessageHandlersCollection(IEnumerable<IMessageHandler> messageHandlers)
287+
IDictionary<string, IList<T>> TransformMessageHandlersCollection<T>(IEnumerable<T> messageHandlers)
282288
{
283-
var dictionary = new Dictionary<string, IList<IMessageHandler>>();
289+
var dictionary = new Dictionary<string, IList<T>>();
284290
foreach (var handler in messageHandlers)
285291
{
286292
var type = handler.GetType();
@@ -292,27 +298,7 @@ IDictionary<string, IList<IMessageHandler>> TransformMessageHandlersCollection(I
292298
dictionary[routingKey].Add(handler);
293299
}
294300
else
295-
dictionary.Add(routingKey, new List<IMessageHandler>() { handler });
296-
}
297-
}
298-
return dictionary;
299-
}
300-
301-
IDictionary<string, IList<IAsyncMessageHandler>> TransformAsyncMessageHandlersCollection(IEnumerable<IAsyncMessageHandler> messageHandlers)
302-
{
303-
var dictionary = new Dictionary<string, IList<IAsyncMessageHandler>>();
304-
foreach (var handler in messageHandlers)
305-
{
306-
var type = handler.GetType();
307-
foreach (var routingKey in _routingKeys[type])
308-
{
309-
if (dictionary.ContainsKey(routingKey))
310-
{
311-
if (!dictionary[routingKey].Any(x => x.GetType() == handler.GetType()))
312-
dictionary[routingKey].Add(handler);
313-
}
314-
else
315-
dictionary.Add(routingKey, new List<IAsyncMessageHandler>() { handler });
301+
dictionary.Add(routingKey, new List<T>() { handler });
316302
}
317303
}
318304
return dictionary;
@@ -345,6 +331,22 @@ void StartClient()
345331
_logger.LogDebug($"The message has been processed by message handler {handler?.GetType().Name}.");
346332
}
347333
}
334+
if (_asyncNonCyclicHandlers.ContainsKey(@event.RoutingKey))
335+
{
336+
var tasks = new List<Task>();
337+
foreach (var handler in _asyncNonCyclicHandlers[@event.RoutingKey])
338+
tasks.Add(RunAsyncNonCyclicHandler(handler, message, @event.RoutingKey));
339+
Task.WaitAll(tasks.ToArray());
340+
}
341+
if (_nonCyclicHandlers.ContainsKey(@event.RoutingKey))
342+
{
343+
foreach (var handler in _nonCyclicHandlers[@event.RoutingKey])
344+
{
345+
_logger.LogDebug($"Starting processing the message by non-cyclic message handler {handler?.GetType().Name}.");
346+
handler.Handle(message, @event.RoutingKey, this);
347+
_logger.LogDebug($"The message has been processed by non-cyclic message handler {handler?.GetType().Name}.");
348+
}
349+
}
348350
_logger.LogInformation($"Success message with deliveryTag {@event.DeliveryTag}.");
349351
Channel.BasicAck(@event.DeliveryTag, false);
350352
}
@@ -396,6 +398,13 @@ async Task RunAsyncHandler(IAsyncMessageHandler handler, string message, string
396398
_logger.LogDebug($"The message has been processed by async message handler {handler?.GetType().Name}.");
397399
}
398400

401+
async Task RunAsyncNonCyclicHandler(IAsyncNonCyclicMessageHandler handler, string message, string routingKey)
402+
{
403+
_logger.LogDebug($"Starting processing the message by async non-cyclic message handler {handler?.GetType().Name}.");
404+
await handler.Handle(message, routingKey, this);
405+
_logger.LogDebug($"The message has been processed by async non-cyclic message handler {handler?.GetType().Name}.");
406+
}
407+
399408
void StartDeadLetterExchange(string exchangeName) =>
400409
_channel.ExchangeDeclare(
401410
exchange: exchangeName,

src/RabbitMQ.Client.Core.DependencyInjection/RabbitMQ.Client.Core.DependencyInjection.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<PropertyGroup>
44
<TargetFramework>netcoreapp2.2</TargetFramework>
55
<LangVersion>latest</LangVersion>
6-
<Version>1.2.3</Version>
6+
<Version>1.3.0</Version>
77
<PackageTags>RabbitMQ</PackageTags>
88
<PackageLicenseFile>LICENSE.txt</PackageLicenseFile>
99
<RepositoryUrl>https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection</RepositoryUrl>

src/RabbitMQ.Client.Core.DependencyInjection/RabbitMqClientDependencyInjectionExtensions.cs

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ public static IServiceCollection AddMessageHandlerSingleton<T>(this IServiceColl
142142
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
143143
return services;
144144
}
145-
146-
145+
147146
/// <summary>
148147
/// Add transient async message handler.
149148
/// </summary>
@@ -207,5 +206,133 @@ public static IServiceCollection AddAsyncMessageHandlerSingleton<T>(this IServic
207206
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
208207
return services;
209208
}
209+
210+
/// <summary>
211+
/// Add transient non-cyclic message handler.
212+
/// </summary>
213+
/// <typeparam name="T">Message handler type.</typeparam>
214+
/// <param name="services">Service collection.</param>
215+
/// <param name="routingKey">Routing key.</param>
216+
/// <returns>Service collection.</returns>
217+
public static IServiceCollection AddNonCyclicMessageHandlerTransient<T>(this IServiceCollection services, string routingKey)
218+
where T : class, INonCyclicMessageHandler
219+
{
220+
services.AddTransient<INonCyclicMessageHandler, T>();
221+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = new[] { routingKey }.ToList() };
222+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
223+
return services;
224+
}
225+
226+
/// <summary>
227+
/// Add transient non-cyclic message handler.
228+
/// </summary>
229+
/// <typeparam name="T">Message handler type.</typeparam>
230+
/// <param name="services">Service collection.</param>
231+
/// <param name="routingKeys">Routing keys.</param>
232+
/// <returns>Service collection.</returns>
233+
public static IServiceCollection AddNonCyclicMessageHandlerTransient<T>(this IServiceCollection services, IEnumerable<string> routingKeys)
234+
where T : class, INonCyclicMessageHandler
235+
{
236+
services.AddTransient<INonCyclicMessageHandler, T>();
237+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = routingKeys.ToList() };
238+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
239+
return services;
240+
}
241+
242+
/// <summary>
243+
/// Add singleton non-cyclic message handler.
244+
/// </summary>
245+
/// <typeparam name="T">Message handler type.</typeparam>
246+
/// <param name="services">Service collection.</param>
247+
/// <param name="routingKey">Routing key.</param>
248+
/// <returns>Service collection.</returns>
249+
public static IServiceCollection AddNonCyclicMessageHandlerSingleton<T>(this IServiceCollection services, string routingKey)
250+
where T : class, INonCyclicMessageHandler
251+
{
252+
services.AddSingleton<INonCyclicMessageHandler, T>();
253+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = new[] { routingKey }.ToList() };
254+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
255+
return services;
256+
}
257+
258+
/// <summary>
259+
/// Add singleton non-cyclic message handler.
260+
/// </summary>
261+
/// <typeparam name="T">Message handler type.</typeparam>
262+
/// <param name="services">Service collection.</param>
263+
/// <param name="routingKeys">Routing keys.</param>
264+
/// <returns>Service collection.</returns>
265+
public static IServiceCollection AddNonCyclicMessageHandlerSingleton<T>(this IServiceCollection services, IEnumerable<string> routingKeys)
266+
where T : class, INonCyclicMessageHandler
267+
{
268+
services.AddSingleton<INonCyclicMessageHandler, T>();
269+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = routingKeys.ToList() };
270+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
271+
return services;
272+
}
273+
274+
/// <summary>
275+
/// Add transient async non-cyclic message handler.
276+
/// </summary>
277+
/// <typeparam name="T">Message handler type.</typeparam>
278+
/// <param name="services">Service collection.</param>
279+
/// <param name="routingKey">Routing key.</param>
280+
/// <returns>Service collection.</returns>
281+
public static IServiceCollection AddAsyncNonCyclicMessageHandlerTransient<T>(this IServiceCollection services, string routingKey)
282+
where T : class, IAsyncNonCyclicMessageHandler
283+
{
284+
services.AddTransient<IAsyncNonCyclicMessageHandler, T>();
285+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = new[] { routingKey }.ToList() };
286+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
287+
return services;
288+
}
289+
290+
/// <summary>
291+
/// Add transient async non-cyclic message handler.
292+
/// </summary>
293+
/// <typeparam name="T">Message handler type.</typeparam>
294+
/// <param name="services">Service collection.</param>
295+
/// <param name="routingKeys">Routing keys.</param>
296+
/// <returns>Service collection.</returns>
297+
public static IServiceCollection AddAsyncNonCyclicMessageHandlerTransient<T>(this IServiceCollection services, IEnumerable<string> routingKeys)
298+
where T : class, IAsyncNonCyclicMessageHandler
299+
{
300+
services.AddTransient<IAsyncNonCyclicMessageHandler, T>();
301+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = routingKeys.ToList() };
302+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
303+
return services;
304+
}
305+
306+
/// <summary>
307+
/// Add singleton async non-cyclic message handler.
308+
/// </summary>
309+
/// <typeparam name="T">Message handler type.</typeparam>
310+
/// <param name="services">Service collection.</param>
311+
/// <param name="routingKey">Routing key.</param>
312+
/// <returns>Service collection.</returns>
313+
public static IServiceCollection AddAsyncNonCyclicMessageHandlerSingleton<T>(this IServiceCollection services, string routingKey)
314+
where T : class, IAsyncNonCyclicMessageHandler
315+
{
316+
services.AddSingleton<IAsyncNonCyclicMessageHandler, T>();
317+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = new[] { routingKey }.ToList() };
318+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
319+
return services;
320+
}
321+
322+
/// <summary>
323+
/// Add singleton async non-cyclic message handler.
324+
/// </summary>
325+
/// <typeparam name="T">Message handler type.</typeparam>
326+
/// <param name="services">Service collection.</param>
327+
/// <param name="routingKeys">Routing keys.</param>
328+
/// <returns>Service collection.</returns>
329+
public static IServiceCollection AddAsyncNonCyclicMessageHandlerSingleton<T>(this IServiceCollection services, IEnumerable<string> routingKeys)
330+
where T : class, IAsyncNonCyclicMessageHandler
331+
{
332+
services.AddSingleton<IAsyncNonCyclicMessageHandler, T>();
333+
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = routingKeys.ToList() };
334+
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
335+
return services;
336+
}
210337
}
211338
}

0 commit comments

Comments
 (0)