Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 4ebdd35

Browse files
author
Anton Vorontsov
committed
Created models directory and moved models there. Created DI methods for message handlers that listen for routes and exchanges. Set up tests.
1 parent 87c1c39 commit 4ebdd35

15 files changed

+621
-126
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/Extensions/WildcardExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Generic;
22
using System.Linq;
3+
using RabbitMQ.Client.Core.DependencyInjection.Models;
34

45
namespace RabbitMQ.Client.Core.DependencyInjection.Extensions
56
{

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlerDependencyInjectionExtensions.cs

Lines changed: 271 additions & 58 deletions
Large diffs are not rendered by default.

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlerRouter.cs

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlingService.cs

Lines changed: 94 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using Microsoft.Extensions.Logging;
77
using RabbitMQ.Client.Core.DependencyInjection.Extensions;
8+
using RabbitMQ.Client.Core.DependencyInjection.Models;
89
using RabbitMQ.Client.Events;
910

1011
namespace RabbitMQ.Client.Core.DependencyInjection
@@ -17,11 +18,7 @@ public class MessageHandlingService : IMessageHandlingService
1718
const int ResendTimeout = 60;
1819

1920
readonly IEnumerable<RabbitMqExchange> _exchanges;
20-
readonly IDictionary<string, IList<IMessageHandler>> _messageHandlers;
21-
readonly IDictionary<string, IList<IAsyncMessageHandler>> _asyncMessageHandlers;
22-
readonly IDictionary<string, IList<INonCyclicMessageHandler>> _nonCyclicHandlers;
23-
readonly IDictionary<string, IList<IAsyncNonCyclicMessageHandler>> _asyncNonCyclicHandlers;
24-
readonly IEnumerable<TreeNode> _routeTree;
21+
readonly IEnumerable<MessageHandlerContainer> _messageHandlerContainers;
2522
readonly ILogger<MessageHandlingService> _logger;
2623

2724
public MessageHandlingService(
@@ -34,12 +31,12 @@ public MessageHandlingService(
3431
ILogger<MessageHandlingService> logger)
3532
{
3633
_exchanges = exchanges;
37-
var routersDictionary = TransformMessageHandlerRoutersToDictionary(routers);
38-
_messageHandlers = TransformMessageHandlersCollectionToDictionary(messageHandlers, routersDictionary);
39-
_asyncMessageHandlers = TransformMessageHandlersCollectionToDictionary(asyncMessageHandlers, routersDictionary);
40-
_nonCyclicHandlers = TransformMessageHandlersCollectionToDictionary(nonCyclicHandlers, routersDictionary);
41-
_asyncNonCyclicHandlers = TransformMessageHandlersCollectionToDictionary(asyncNonCyclicHandlers, routersDictionary);
42-
_routeTree = ConstructRoutesTree(routers);
34+
_messageHandlerContainers = ConstructMessageHandlerContainers(
35+
routers,
36+
messageHandlers,
37+
asyncMessageHandlers,
38+
nonCyclicHandlers,
39+
asyncNonCyclicHandlers);
4340
_logger = logger;
4441
}
4542

@@ -57,8 +54,8 @@ public void HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs, IQueueS
5754

5855
try
5956
{
60-
var matchingRoutes = GetMatchingRoutePatterns(eventArgs.RoutingKey);
61-
ProcessMessage(message, queueService, matchingRoutes);
57+
var matchingRoutes = GetMatchingRoutePatterns(eventArgs.Exchange, eventArgs.RoutingKey);
58+
ProcessMessage(eventArgs.Exchange, message, queueService, matchingRoutes);
6259
queueService.Channel.BasicAck(eventArgs.DeliveryTag, false);
6360
_logger.LogInformation($"Message processing finished successfully. Acknowledge has been sent with deliveryTag {eventArgs.DeliveryTag}.");
6461
}
@@ -95,21 +92,34 @@ public void HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs, IQueueS
9592
}
9693
}
9794

98-
IEnumerable<string> GetMatchingRoutePatterns(string routingKey)
95+
IEnumerable<string> GetMatchingRoutePatterns(string exchange, string routingKey)
9996
{
97+
var tree = _messageHandlerContainers.FirstOrDefault(x => x.Exchange == exchange)?.Tree
98+
?? _messageHandlerContainers.FirstOrDefault(x => x.IsGeneral)?.Tree;
99+
if (tree is null)
100+
{
101+
return Enumerable.Empty<string>();
102+
}
103+
100104
var routingKeyParts = routingKey.Split(".");
101-
return WildcardExtensions.GetMatchingRoutePatterns(_routeTree, routingKeyParts).ToList();
105+
return WildcardExtensions.GetMatchingRoutePatterns(tree, routingKeyParts).ToList();
102106
}
103107

104-
void ProcessMessage(string message, IQueueService queueService, IEnumerable<string> matchingRoutes)
108+
void ProcessMessage(string exchange, string message, IQueueService queueService, IEnumerable<string> matchingRoutes)
105109
{
110+
var container = _messageHandlerContainers.FirstOrDefault(x => x.Exchange == exchange) ?? _messageHandlerContainers.FirstOrDefault(x => x.IsGeneral);
111+
if (container is null)
112+
{
113+
return;
114+
}
115+
106116
var executedHandlers = new List<Type>();
107117
foreach (var matchingRoute in matchingRoutes)
108118
{
109-
if (_asyncMessageHandlers.ContainsKey(matchingRoute))
119+
if (container.AsyncMessageHandlers.ContainsKey(matchingRoute))
110120
{
111121
var tasks = new List<Task>();
112-
foreach (var handler in _asyncMessageHandlers[matchingRoute])
122+
foreach (var handler in container.AsyncMessageHandlers[matchingRoute])
113123
{
114124
var handlerType = handler.GetType();
115125
if (executedHandlers.Contains(handlerType))
@@ -122,9 +132,9 @@ void ProcessMessage(string message, IQueueService queueService, IEnumerable<stri
122132
}
123133
Task.WaitAll(tasks.ToArray());
124134
}
125-
if (_messageHandlers.ContainsKey(matchingRoute))
135+
if (container.MessageHandlers.ContainsKey(matchingRoute))
126136
{
127-
foreach (var handler in _messageHandlers[matchingRoute])
137+
foreach (var handler in container.MessageHandlers[matchingRoute])
128138
{
129139
var handlerType = handler.GetType();
130140
if (executedHandlers.Contains(handlerType))
@@ -136,10 +146,10 @@ void ProcessMessage(string message, IQueueService queueService, IEnumerable<stri
136146
RunMessageHandler(handler, message, matchingRoute);
137147
}
138148
}
139-
if (_asyncNonCyclicHandlers.ContainsKey(matchingRoute))
149+
if (container.AsyncNonCyclicHandlers.ContainsKey(matchingRoute))
140150
{
141151
var tasks = new List<Task>();
142-
foreach (var handler in _asyncNonCyclicHandlers[matchingRoute])
152+
foreach (var handler in container.AsyncNonCyclicHandlers[matchingRoute])
143153
{
144154
var handlerType = handler.GetType();
145155
if (executedHandlers.Contains(handlerType))
@@ -152,9 +162,9 @@ void ProcessMessage(string message, IQueueService queueService, IEnumerable<stri
152162
}
153163
Task.WaitAll(tasks.ToArray());
154164
}
155-
if (_nonCyclicHandlers.ContainsKey(matchingRoute))
165+
if (container.NonCyclicHandlers.ContainsKey(matchingRoute))
156166
{
157-
foreach (var handler in _nonCyclicHandlers[matchingRoute])
167+
foreach (var handler in container.NonCyclicHandlers[matchingRoute])
158168
{
159169
var handlerType = handler.GetType();
160170
if (executedHandlers.Contains(handlerType))
@@ -201,18 +211,74 @@ async Task RunAsyncNonCyclicMessageHandler(IAsyncNonCyclicMessageHandler handler
201211
_logger.LogDebug($"The message has been processed by async non-cyclic message handler {handler.GetType().Name}.");
202212
}
203213

204-
void ValidateHandler<T>(T messageHandler)
214+
static void ValidateHandler<T>(T messageHandler)
205215
{
206216
if (messageHandler is null)
207217
{
208218
throw new ArgumentNullException(nameof(messageHandler), "Message handler is null.");
209219
}
210220
}
211221

212-
static IEnumerable<TreeNode> ConstructRoutesTree(IEnumerable<MessageHandlerRouter> routers)
222+
static IEnumerable<MessageHandlerContainer> ConstructMessageHandlerContainers(
223+
IEnumerable<MessageHandlerRouter> routers,
224+
IEnumerable<IMessageHandler> messageHandlers,
225+
IEnumerable<IAsyncMessageHandler> asyncMessageHandlers,
226+
IEnumerable<INonCyclicMessageHandler> nonCyclicHandlers,
227+
IEnumerable<IAsyncNonCyclicMessageHandler> asyncNonCyclicHandlers)
213228
{
214-
var routePatterns = routers.SelectMany(x => x.RoutePatterns).Distinct();
215-
return WildcardExtensions.ConstructRoutesTree(routePatterns);
229+
var containers = new List<MessageHandlerContainer>();
230+
var generalRouters = routers.Where(x => x.IsGeneral).ToList();
231+
if (generalRouters.Any())
232+
{
233+
var container = CreateContailer(
234+
null,
235+
generalRouters,
236+
messageHandlers,
237+
asyncMessageHandlers,
238+
nonCyclicHandlers,
239+
asyncNonCyclicHandlers);
240+
containers.Add(container);
241+
}
242+
243+
var exchanges = routers.Where(x => !x.IsGeneral).Select(x => x.Exchange).Distinct().ToList();
244+
foreach (var exchange in exchanges)
245+
{
246+
var exchangeRouters = routers.Where(x => x.Exchange == exchange).ToList();
247+
var container = CreateContailer(
248+
exchange,
249+
exchangeRouters,
250+
messageHandlers,
251+
asyncMessageHandlers,
252+
nonCyclicHandlers,
253+
asyncNonCyclicHandlers);
254+
containers.Add(container);
255+
}
256+
return containers;
257+
}
258+
259+
static MessageHandlerContainer CreateContailer(
260+
string exchange,
261+
IEnumerable<MessageHandlerRouter> selectedRouters,
262+
IEnumerable<IMessageHandler> messageHandlers,
263+
IEnumerable<IAsyncMessageHandler> asyncMessageHandlers,
264+
IEnumerable<INonCyclicMessageHandler> nonCyclicHandlers,
265+
IEnumerable<IAsyncNonCyclicMessageHandler> asyncNonCyclicHandlers)
266+
{
267+
var routersDictionary = TransformMessageHandlerRoutersToDictionary(selectedRouters);
268+
var boundMessageHandlers = messageHandlers.Where(x => routersDictionary.Keys.Contains(x.GetType()));
269+
var boundAsyncMessageHandlers = asyncMessageHandlers.Where(x => routersDictionary.Keys.Contains(x.GetType()));
270+
var boundNonCyclicMessageHandlers = nonCyclicHandlers.Where(x => routersDictionary.Keys.Contains(x.GetType()));
271+
var boundAsyncNonCyclicMessageHandlers = asyncNonCyclicHandlers.Where(x => routersDictionary.Keys.Contains(x.GetType()));
272+
var routePatterns = selectedRouters.SelectMany(x => x.RoutePatterns).Distinct().ToList();
273+
return new MessageHandlerContainer
274+
{
275+
Exchange = exchange,
276+
Tree = WildcardExtensions.ConstructRoutesTree(routePatterns),
277+
MessageHandlers = TransformMessageHandlersCollectionToDictionary(boundMessageHandlers, routersDictionary),
278+
AsyncMessageHandlers = TransformMessageHandlersCollectionToDictionary(boundAsyncMessageHandlers, routersDictionary),
279+
NonCyclicHandlers = TransformMessageHandlersCollectionToDictionary(boundNonCyclicMessageHandlers, routersDictionary),
280+
AsyncNonCyclicHandlers = TransformMessageHandlersCollectionToDictionary(boundAsyncNonCyclicMessageHandlers, routersDictionary)
281+
};
216282
}
217283

218284
static IDictionary<Type, List<string>> TransformMessageHandlerRoutersToDictionary(IEnumerable<MessageHandlerRouter> routers)

src/RabbitMQ.Client.Core.DependencyInjection/ExchangeServiceDescriptor.cs renamed to src/RabbitMQ.Client.Core.DependencyInjection/Models/ExchangeServiceDescriptor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
using Microsoft.Extensions.DependencyInjection;
2-
using System;
1+
using System;
2+
using Microsoft.Extensions.DependencyInjection;
33

4-
namespace RabbitMQ.Client.Core.DependencyInjection
4+
namespace RabbitMQ.Client.Core.DependencyInjection.Models
55
{
66
/// <summary>
77
/// A service extension for registration exchange singleton "services".
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Models
4+
{
5+
/// <summary>
6+
/// The service model that represents a container that contains a route patterns tree connected with an exchange and all types of message handlers.
7+
/// If route patterns configured without an exchange they get in the "general" container.
8+
/// </summary>
9+
public class MessageHandlerContainer
10+
{
11+
/// <summary>
12+
/// An exchange.
13+
/// </summary>
14+
/// <remarks>
15+
/// Could be null.
16+
/// </remarks>
17+
public string Exchange { get; set; }
18+
19+
/// <summary>
20+
/// Route patterns tree (trie) structure.
21+
/// </summary>
22+
public IEnumerable<TreeNode> Tree { get; set; }
23+
24+
/// <summary>
25+
/// Flag is the container general.
26+
/// </summary>
27+
public bool IsGeneral => string.IsNullOrEmpty(Exchange);
28+
29+
/// <summary>
30+
/// Dictionary of route patterns and message handlers connected by them.
31+
/// </summary>
32+
public IDictionary<string, IList<IMessageHandler>> MessageHandlers { get; set; }
33+
34+
/// <summary>
35+
/// Dictionary of route patterns and async message handlers connected by them.
36+
/// </summary>
37+
public IDictionary<string, IList<IAsyncMessageHandler>> AsyncMessageHandlers { get; set; }
38+
39+
/// <summary>
40+
/// Dictionary of route patterns and non-cyclic message handlers connected by them.
41+
/// </summary>
42+
public IDictionary<string, IList<INonCyclicMessageHandler>> NonCyclicHandlers { get; set; }
43+
44+
/// <summary>
45+
/// Dictionary of route patterns and async non-cyclic message handlers connected by them.
46+
/// </summary>
47+
public IDictionary<string, IList<IAsyncNonCyclicMessageHandler>> AsyncNonCyclicHandlers { get; set; }
48+
}
49+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace RabbitMQ.Client.Core.DependencyInjection.Models
5+
{
6+
/// <summary>
7+
/// Message handler router model.
8+
/// </summary>
9+
public class MessageHandlerRouter
10+
{
11+
/// <summary>
12+
/// Message Handler Type
13+
/// </summary>
14+
public Type Type { get; set; }
15+
/// <summary>
16+
/// Collection of route patterns (routing keys) that handler will be "listening".
17+
/// </summary>
18+
public List<string> RoutePatterns { get; set; } = new List<string>();
19+
20+
/// <summary>
21+
/// An exchange which is being listened by the message handler by specified route patterns.
22+
/// </summary>
23+
/// <remarks>
24+
/// Exchange can be null, and in that scenario message handler will be general
25+
/// (it will listen all messages regardless of an exchange).
26+
/// </remarks>
27+
public string Exchange { get; set; }
28+
29+
/// <summary>
30+
/// Flag is the message handler general.
31+
/// </summary>
32+
public bool IsGeneral => string.IsNullOrEmpty(Exchange);
33+
}
34+
}

src/RabbitMQ.Client.Core.DependencyInjection/RabbitMqExchange.cs renamed to src/RabbitMQ.Client.Core.DependencyInjection/Models/RabbitMqExchange.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
22

3-
namespace RabbitMQ.Client.Core.DependencyInjection
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Models
44
{
55
/// <summary>
66
/// Exchange model.

src/RabbitMQ.Client.Core.DependencyInjection/Extensions/TreeNode.cs renamed to src/RabbitMQ.Client.Core.DependencyInjection/Models/TreeNode.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System.Collections.Generic;
22

3-
namespace RabbitMQ.Client.Core.DependencyInjection.Extensions
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Models
44
{
55
/// <summary>
66
/// A model that represents nodes supposed for building a tree (trie) structure for a pattern (wildcard) matching.

src/RabbitMQ.Client.Core.DependencyInjection/QueueService.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Net;
1010
using System.Text;
1111
using System.Threading.Tasks;
12+
using RabbitMQ.Client.Core.DependencyInjection.Models;
1213

1314
namespace RabbitMQ.Client.Core.DependencyInjection
1415
{

0 commit comments

Comments
 (0)