Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 87c1c39

Browse files
author
Anton Vorontsov
committed
Added wildcards to the MessageHandlingService. Set up couple tests for those cases.
1 parent 9500877 commit 87c1c39

File tree

7 files changed

+183
-53
lines changed

7 files changed

+183
-53
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/Extensions/WildcardExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public static class WildcardExtensions
2525
/// Collection of tree nodes <see cref="TreeNode"/>.
2626
/// Depending on routing key bindings that collection can be flat or treelike.
2727
/// </returns>
28-
public static IEnumerable<TreeNode> ConstructTree(IEnumerable<string> routePatterns)
28+
public static IEnumerable<TreeNode> ConstructRoutesTree(IEnumerable<string> routePatterns)
2929
{
3030
var tree = new List<TreeNode>();
3131

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlerDependencyInjectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ static IServiceCollection AddInstanceTransient<U, T>(this IServiceCollection ser
174174
where T : class, U
175175
{
176176
services.AddTransient<U, T>();
177-
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = routingKeys.ToList() };
177+
var router = new MessageHandlerRouter { Type = typeof(T), RoutePatterns = routingKeys.ToList() };
178178
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
179179
return services;
180180
}
@@ -184,7 +184,7 @@ static IServiceCollection AddInstanceSingleton<U, T>(this IServiceCollection ser
184184
where T : class, U
185185
{
186186
services.AddSingleton<U, T>();
187-
var router = new MessageHandlerRouter { Type = typeof(T), RoutingKeys = routingKeys.ToList() };
187+
var router = new MessageHandlerRouter { Type = typeof(T), RoutePatterns = routingKeys.ToList() };
188188
services.Add(new ServiceDescriptor(typeof(MessageHandlerRouter), router));
189189
return services;
190190
}

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlerRouter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ public class MessageHandlerRouter
1414
public Type Type { get; set; }
1515

1616
/// <summary>
17-
/// Collection of routing keys that handler will be "listening".
17+
/// Collection of route patterns (routing keys) that handler will be "listening".
1818
/// </summary>
19-
public List<string> RoutingKeys { get; set; } = new List<string>();
19+
public List<string> RoutePatterns { get; set; } = new List<string>();
2020
}
2121
}

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlingService.cs

Lines changed: 97 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Text;
55
using System.Threading.Tasks;
66
using Microsoft.Extensions.Logging;
7+
using RabbitMQ.Client.Core.DependencyInjection.Extensions;
78
using RabbitMQ.Client.Events;
89

910
namespace RabbitMQ.Client.Core.DependencyInjection
@@ -20,6 +21,7 @@ public class MessageHandlingService : IMessageHandlingService
2021
readonly IDictionary<string, IList<IAsyncMessageHandler>> _asyncMessageHandlers;
2122
readonly IDictionary<string, IList<INonCyclicMessageHandler>> _nonCyclicHandlers;
2223
readonly IDictionary<string, IList<IAsyncNonCyclicMessageHandler>> _asyncNonCyclicHandlers;
24+
readonly IEnumerable<TreeNode> _routeTree;
2325
readonly ILogger<MessageHandlingService> _logger;
2426

2527
public MessageHandlingService(
@@ -32,11 +34,12 @@ public MessageHandlingService(
3234
ILogger<MessageHandlingService> logger)
3335
{
3436
_exchanges = exchanges;
35-
var messageHandlerRouters = TransformMessageHandlerRouters(routers);
36-
_messageHandlers = TransformMessageHandlersCollection(messageHandlers, messageHandlerRouters);
37-
_asyncMessageHandlers = TransformMessageHandlersCollection(asyncMessageHandlers, messageHandlerRouters);
38-
_nonCyclicHandlers = TransformMessageHandlersCollection(nonCyclicHandlers, messageHandlerRouters);
39-
_asyncNonCyclicHandlers = TransformMessageHandlersCollection(asyncNonCyclicHandlers, messageHandlerRouters);
37+
var routersDictionary = TransformMessageHandlerRoutersToDictionary(routers);
38+
_messageHandlers = TransformMessageHandlersCollectionToDictionary(messageHandlers, routersDictionary);
39+
_asyncMessageHandlers = TransformMessageHandlersCollectionToDictionary(asyncMessageHandlers, routersDictionary);
40+
_nonCyclicHandlers = TransformMessageHandlersCollectionToDictionary(nonCyclicHandlers, routersDictionary);
41+
_asyncNonCyclicHandlers = TransformMessageHandlersCollectionToDictionary(asyncNonCyclicHandlers, routersDictionary);
42+
_routeTree = ConstructRoutesTree(routers);
4043
_logger = logger;
4144
}
4245

@@ -54,38 +57,8 @@ public void HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs, IQueueS
5457

5558
try
5659
{
57-
if (_asyncMessageHandlers.ContainsKey(eventArgs.RoutingKey))
58-
{
59-
var tasks = new List<Task>();
60-
foreach (var handler in _asyncMessageHandlers[eventArgs.RoutingKey])
61-
{
62-
tasks.Add(RunAsyncMessageHandler(handler, message, eventArgs.RoutingKey));
63-
}
64-
Task.WaitAll(tasks.ToArray());
65-
}
66-
if (_messageHandlers.ContainsKey(eventArgs.RoutingKey))
67-
{
68-
foreach (var handler in _messageHandlers[eventArgs.RoutingKey])
69-
{
70-
RunMessageHandler(handler, message, eventArgs.RoutingKey);
71-
}
72-
}
73-
if (_asyncNonCyclicHandlers.ContainsKey(eventArgs.RoutingKey))
74-
{
75-
var tasks = new List<Task>();
76-
foreach (var handler in _asyncNonCyclicHandlers[eventArgs.RoutingKey])
77-
{
78-
tasks.Add(RunAsyncNonCyclicMessageHandler(handler, message, eventArgs.RoutingKey, queueService));
79-
}
80-
Task.WaitAll(tasks.ToArray());
81-
}
82-
if (_nonCyclicHandlers.ContainsKey(eventArgs.RoutingKey))
83-
{
84-
foreach (var handler in _nonCyclicHandlers[eventArgs.RoutingKey])
85-
{
86-
RunNonCyclicMessageHandler(handler, message, eventArgs.RoutingKey, queueService);
87-
}
88-
}
60+
var matchingRoutes = GetMatchingRoutePatterns(eventArgs.RoutingKey);
61+
ProcessMessage(message, queueService, matchingRoutes);
8962
queueService.Channel.BasicAck(eventArgs.DeliveryTag, false);
9063
_logger.LogInformation($"Message processing finished successfully. Acknowledge has been sent with deliveryTag {eventArgs.DeliveryTag}.");
9164
}
@@ -122,6 +95,80 @@ public void HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs, IQueueS
12295
}
12396
}
12497

98+
IEnumerable<string> GetMatchingRoutePatterns(string routingKey)
99+
{
100+
var routingKeyParts = routingKey.Split(".");
101+
return WildcardExtensions.GetMatchingRoutePatterns(_routeTree, routingKeyParts).ToList();
102+
}
103+
104+
void ProcessMessage(string message, IQueueService queueService, IEnumerable<string> matchingRoutes)
105+
{
106+
var executedHandlers = new List<Type>();
107+
foreach (var matchingRoute in matchingRoutes)
108+
{
109+
if (_asyncMessageHandlers.ContainsKey(matchingRoute))
110+
{
111+
var tasks = new List<Task>();
112+
foreach (var handler in _asyncMessageHandlers[matchingRoute])
113+
{
114+
var handlerType = handler.GetType();
115+
if (executedHandlers.Contains(handlerType))
116+
{
117+
continue;
118+
}
119+
120+
executedHandlers.Add(handlerType);
121+
tasks.Add(RunAsyncMessageHandler(handler, message, matchingRoute));
122+
}
123+
Task.WaitAll(tasks.ToArray());
124+
}
125+
if (_messageHandlers.ContainsKey(matchingRoute))
126+
{
127+
foreach (var handler in _messageHandlers[matchingRoute])
128+
{
129+
var handlerType = handler.GetType();
130+
if (executedHandlers.Contains(handlerType))
131+
{
132+
continue;
133+
}
134+
135+
executedHandlers.Add(handlerType);
136+
RunMessageHandler(handler, message, matchingRoute);
137+
}
138+
}
139+
if (_asyncNonCyclicHandlers.ContainsKey(matchingRoute))
140+
{
141+
var tasks = new List<Task>();
142+
foreach (var handler in _asyncNonCyclicHandlers[matchingRoute])
143+
{
144+
var handlerType = handler.GetType();
145+
if (executedHandlers.Contains(handlerType))
146+
{
147+
continue;
148+
}
149+
150+
executedHandlers.Add(handlerType);
151+
tasks.Add(RunAsyncNonCyclicMessageHandler(handler, message, matchingRoute, queueService));
152+
}
153+
Task.WaitAll(tasks.ToArray());
154+
}
155+
if (_nonCyclicHandlers.ContainsKey(matchingRoute))
156+
{
157+
foreach (var handler in _nonCyclicHandlers[matchingRoute])
158+
{
159+
var handlerType = handler.GetType();
160+
if (executedHandlers.Contains(handlerType))
161+
{
162+
continue;
163+
}
164+
165+
executedHandlers.Add(handlerType);
166+
RunNonCyclicMessageHandler(handler, message, matchingRoute, queueService);
167+
}
168+
}
169+
}
170+
}
171+
125172
void RunMessageHandler(IMessageHandler handler, string message, string routingKey)
126173
{
127174
ValidateHandler(handler);
@@ -162,30 +209,38 @@ void ValidateHandler<T>(T messageHandler)
162209
}
163210
}
164211

165-
static IDictionary<Type, List<string>> TransformMessageHandlerRouters(IEnumerable<MessageHandlerRouter> routers)
212+
static IEnumerable<TreeNode> ConstructRoutesTree(IEnumerable<MessageHandlerRouter> routers)
213+
{
214+
var routePatterns = routers.SelectMany(x => x.RoutePatterns).Distinct();
215+
return WildcardExtensions.ConstructRoutesTree(routePatterns);
216+
}
217+
218+
static IDictionary<Type, List<string>> TransformMessageHandlerRoutersToDictionary(IEnumerable<MessageHandlerRouter> routers)
166219
{
167220
var dictionary = new Dictionary<Type, List<string>>();
168221
foreach (var router in routers)
169222
{
170223
if (dictionary.ContainsKey(router.Type))
171224
{
172-
dictionary[router.Type] = dictionary[router.Type].Union(router.RoutingKeys).ToList();
225+
dictionary[router.Type] = dictionary[router.Type].Union(router.RoutePatterns).ToList();
173226
}
174227
else
175228
{
176-
dictionary.Add(router.Type, router.RoutingKeys);
229+
dictionary.Add(router.Type, router.RoutePatterns);
177230
}
178231
}
179232
return dictionary;
180233
}
181234

182-
static IDictionary<string, IList<T>> TransformMessageHandlersCollection<T>(IEnumerable<T> messageHandlers, IDictionary<Type, List<string>> messageHandlerRouters)
235+
static IDictionary<string, IList<T>> TransformMessageHandlersCollectionToDictionary<T>(
236+
IEnumerable<T> messageHandlers,
237+
IDictionary<Type, List<string>> routersDictionary)
183238
{
184239
var dictionary = new Dictionary<string, IList<T>>();
185240
foreach (var handler in messageHandlers)
186241
{
187242
var type = handler.GetType();
188-
foreach (var routingKey in messageHandlerRouters[type])
243+
foreach (var routingKey in routersDictionary[type])
189244
{
190245
if (dictionary.ContainsKey(routingKey))
191246
{

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/HandleMessageReceivingEventTestData.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,81 @@ public IEnumerator<object[]> GetEnumerator()
5454
AsyncNonCyclicMessageHandlerShouldTrigger = false
5555
}
5656
};
57+
yield return new object[]
58+
{
59+
new HandleMessageReceivingEventTestDataModel
60+
{
61+
MessageRoutingKey = "file.update",
62+
MessageHandlerPatterns = new List<string> { "#" },
63+
MessageHandlerShouldTrigger = true,
64+
AsyncMessageHandlerPatterns = new List<string> { "*.*.*", "file.delete" },
65+
AsyncMessageHandlerShouldTrigger = false,
66+
NonCyclicMessageHandlerPatterns = new List<string> { "*.*", "file.update" },
67+
NonCyclicMessageHandlerShouldTrigger = true,
68+
AsyncNonCyclicMessageHandlerPatterns = new List<string> { "file.#" },
69+
AsyncNonCyclicMessageHandlerShouldTrigger = true
70+
}
71+
};
72+
yield return new object[]
73+
{
74+
new HandleMessageReceivingEventTestDataModel
75+
{
76+
MessageRoutingKey = "connection.create.stable",
77+
MessageHandlerPatterns = new List<string> { "*.*.*" },
78+
MessageHandlerShouldTrigger = true,
79+
AsyncMessageHandlerPatterns = new List<string> { "#.create.#" },
80+
AsyncMessageHandlerShouldTrigger = true,
81+
NonCyclicMessageHandlerPatterns = new List<string> { "*.*", "*.*.*.*", "create.#", "#.create" },
82+
NonCyclicMessageHandlerShouldTrigger = false,
83+
AsyncNonCyclicMessageHandlerPatterns = new List<string> { "*.create.*" },
84+
AsyncNonCyclicMessageHandlerShouldTrigger = true
85+
}
86+
};
87+
yield return new object[]
88+
{
89+
new HandleMessageReceivingEventTestDataModel
90+
{
91+
MessageRoutingKey = "file.update",
92+
MessageHandlerPatterns = new List<string> { "*.*.*" },
93+
MessageHandlerShouldTrigger = false,
94+
AsyncMessageHandlerPatterns = new List<string> { "#.update" },
95+
AsyncMessageHandlerShouldTrigger = true,
96+
NonCyclicMessageHandlerPatterns = new List<string> { "*.update" },
97+
NonCyclicMessageHandlerShouldTrigger = true,
98+
AsyncNonCyclicMessageHandlerPatterns = new List<string> { "file.*.*" },
99+
AsyncNonCyclicMessageHandlerShouldTrigger = false
100+
}
101+
};
102+
yield return new object[]
103+
{
104+
new HandleMessageReceivingEventTestDataModel
105+
{
106+
MessageRoutingKey = "final.report.create",
107+
MessageHandlerPatterns = new List<string> { "#", "*.*.*" },
108+
MessageHandlerShouldTrigger = true,
109+
AsyncMessageHandlerPatterns = new List<string> { "*.*" },
110+
AsyncMessageHandlerShouldTrigger = false,
111+
NonCyclicMessageHandlerPatterns = new List<string> { "*.update" },
112+
NonCyclicMessageHandlerShouldTrigger = false,
113+
AsyncNonCyclicMessageHandlerPatterns = new List<string> { "*.*.create" },
114+
AsyncNonCyclicMessageHandlerShouldTrigger = true
115+
}
116+
};
117+
yield return new object[]
118+
{
119+
new HandleMessageReceivingEventTestDataModel
120+
{
121+
MessageRoutingKey = "file.update.author.credentials",
122+
MessageHandlerPatterns = new List<string> { "*.*", "*.*.*" },
123+
MessageHandlerShouldTrigger = false,
124+
AsyncMessageHandlerPatterns = new List<string> { "#" },
125+
AsyncMessageHandlerShouldTrigger = true,
126+
NonCyclicMessageHandlerPatterns = new List<string> { "file.update.author.credentials" },
127+
NonCyclicMessageHandlerShouldTrigger = true,
128+
AsyncNonCyclicMessageHandlerPatterns = new List<string> { "file.update.credentials" },
129+
AsyncNonCyclicMessageHandlerShouldTrigger = false
130+
}
131+
};
57132
}
58133
}
59134
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/MessageHandlingServiceTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ public void ShouldProperlyHandleMessageReceivingEvent(HandleMessageReceivingEven
3333

3434
var routers = new List<MessageHandlerRouter>
3535
{
36-
new MessageHandlerRouter { Type = messageHandlerMock.Object.GetType(), RoutingKeys = testDataModel.MessageHandlerPatterns },
37-
new MessageHandlerRouter { Type = asyncMessageHandlerMock.Object.GetType(), RoutingKeys = testDataModel.AsyncMessageHandlerPatterns },
38-
new MessageHandlerRouter { Type = nonCyclicMessageHandlerMock.Object.GetType(), RoutingKeys = testDataModel.NonCyclicMessageHandlerPatterns },
39-
new MessageHandlerRouter { Type = asyncNonCyclicMessageHandlerMock.Object.GetType(), RoutingKeys = testDataModel.AsyncNonCyclicMessageHandlerPatterns }
36+
new MessageHandlerRouter { Type = messageHandlerMock.Object.GetType(), RoutePatterns = testDataModel.MessageHandlerPatterns },
37+
new MessageHandlerRouter { Type = asyncMessageHandlerMock.Object.GetType(), RoutePatterns = testDataModel.AsyncMessageHandlerPatterns },
38+
new MessageHandlerRouter { Type = nonCyclicMessageHandlerMock.Object.GetType(), RoutePatterns = testDataModel.NonCyclicMessageHandlerPatterns },
39+
new MessageHandlerRouter { Type = asyncNonCyclicMessageHandlerMock.Object.GetType(), RoutePatterns = testDataModel.AsyncNonCyclicMessageHandlerPatterns }
4040
};
4141

4242
var service = CreateService(

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/WildcardExtensionsTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public WildcardExtensionsTests()
2828
[Fact]
2929
public void ShouldProperlyConstructTree()
3030
{
31-
var tree = WildcardExtensions.ConstructTree(_routes);
31+
var tree = WildcardExtensions.ConstructRoutesTree(_routes);
3232

3333
var countNodes = CountNodes(tree);
3434
Assert.Equal(15, countNodes);
@@ -79,7 +79,7 @@ public void ShouldProperlyConstructTree()
7979
[InlineData("final.report.create", new[] { "#", "#.create", "*.*.*", "*.*.create" })]
8080
public void ShouldProperlyGetMatchingRoutes(string routingKey, IEnumerable<string> routes)
8181
{
82-
var tree = WildcardExtensions.ConstructTree(_routes);
82+
var tree = WildcardExtensions.ConstructRoutesTree(_routes);
8383

8484
var routingKeyParts = routingKey.Split(".");
8585
var matchingRoutes = WildcardExtensions.GetMatchingRoutePatterns(tree, routingKeyParts).ToList();

0 commit comments

Comments
 (0)