Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 380eaf0

Browse files
author
Anton Vorontsov
committed
Moved message consumption logic to the MessageHandlingService from QueueService. Added documentation for wildcard extension classes.
1 parent 9e3e0f4 commit 380eaf0

File tree

9 files changed

+285
-196
lines changed

9 files changed

+285
-196
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Extensions
4+
{
5+
/// <summary>
6+
/// A model that represents nodes supposed for building a tree (trie) structure for a pattern (wildcard) matching.
7+
/// </summary>
8+
public class TreeNode
9+
{
10+
/// <summary>
11+
/// Part of the routing key.
12+
/// </summary>
13+
/// <remarks>
14+
/// Routing key split into parts by the dot.
15+
/// </remarks>
16+
public string KeyPartition { get; set; }
17+
18+
/// <summary>
19+
/// Parent node.
20+
/// </summary>
21+
public TreeNode Parent { get; set; }
22+
23+
/// <summary>
24+
/// Child nodes.
25+
/// </summary>
26+
public List<TreeNode> Nodes { get; } = new List<TreeNode>();
27+
28+
/// <summary>
29+
/// Flag is the node "last" - a single word that comes without any other parts.
30+
/// </summary>
31+
public bool IsLastNode { get; set; }
32+
}
33+
}

src/RabbitMQ.Client.Core.DependencyInjection/WildcardExtensions.cs renamed to src/RabbitMQ.Client.Core.DependencyInjection/Extensions/WildcardExtensions.cs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
11
using System.Collections.Generic;
22
using System.Linq;
33

4-
namespace RabbitMQ.Client.Core.DependencyInjection
4+
namespace RabbitMQ.Client.Core.DependencyInjection.Extensions
55
{
6+
/// <summary>
7+
/// An extension class that contains functionality of pattern (wildcard) matching.
8+
/// </summary>
9+
/// <remarks>
10+
/// Methods of that class allows finding route patterns by which message handlers are "listening" for messages.
11+
/// </remarks>
612
public static class WildcardExtensions
713
{
8-
private const string Separator = ".";
9-
private const string SingleWordPattern = "*";
10-
private const string MultipleWordsPattern = "#";
14+
const string Separator = ".";
15+
const string SingleWordPattern = "*";
16+
const string MultipleWordsPattern = "#";
1117

12-
public static IEnumerable<TreeNode> ConstructTree(IEnumerable<string> routingKeyBindings)
18+
/// <summary>
19+
/// Construct tree (trie) structure of message handler route patterns.
20+
/// </summary>
21+
/// <param name="routePatterns">
22+
/// Collection of message handler route patterns, which are used by them for a message "listening".
23+
/// </param>
24+
/// <returns>
25+
/// Collection of tree nodes <see cref="TreeNode"/>.
26+
/// Depending on routing key bindings that collection can be flat or treelike.
27+
/// </returns>
28+
public static IEnumerable<TreeNode> ConstructTree(IEnumerable<string> routePatterns)
1329
{
1430
var tree = new List<TreeNode>();
1531

16-
foreach (var binding in routingKeyBindings)
32+
foreach (var binding in routePatterns)
1733
{
1834
var keyParts = binding.Split(Separator);
1935
TreeNode parentTreeNode = null;
@@ -49,9 +65,16 @@ public static IEnumerable<TreeNode> ConstructTree(IEnumerable<string> routingKey
4965
return tree;
5066
}
5167

52-
public static IEnumerable<string> GetMatchingRoutePatterns(IEnumerable<TreeNode> bindingsTree, string[] routingKeyParts, int depth = 0)
68+
/// <summary>
69+
/// Get route patterns that match the given routing key.
70+
/// </summary>
71+
/// <param name="tree">Collection (tree, trie) of nodes.</param>
72+
/// <param name="routingKeyParts">Array of routing key parts split by dots.</param>
73+
/// <param name="depth">Nesting level of the tree.</param>
74+
/// <returns>Collection of route patterns that correspond to the given routing key.</returns>
75+
public static IEnumerable<string> GetMatchingRoutePatterns(IEnumerable<TreeNode> tree, string[] routingKeyParts, int depth = 0)
5376
{
54-
foreach (var node in bindingsTree)
77+
foreach (var node in tree)
5578
{
5679
var matchingPart = routingKeyParts[depth];
5780
if (node.KeyPartition == MultipleWordsPattern)
@@ -91,15 +114,15 @@ public static IEnumerable<string> GetMatchingRoutePatterns(IEnumerable<TreeNode>
91114
}
92115
}
93116

94-
private static IEnumerable<string[]> CollectRoutingKeyTails(IReadOnlyCollection<string> routingKeyParts, int depthStart)
117+
static IEnumerable<string[]> CollectRoutingKeyTails(IReadOnlyCollection<string> routingKeyParts, int depthStart)
95118
{
96119
for (var index = depthStart; index < routingKeyParts.Count; index++)
97120
{
98121
yield return routingKeyParts.Skip(index).ToArray();
99122
}
100123
}
101124

102-
private static string CollectRoutingKeyInReverseOrder(TreeNode node, string routingKey = "")
125+
static string CollectRoutingKeyInReverseOrder(TreeNode node, string routingKey = "")
103126
{
104127
routingKey = string.IsNullOrEmpty(routingKey) ? node.KeyPartition : $"{node.KeyPartition}.{routingKey}";
105128
return node.Parent != null ? CollectRoutingKeyInReverseOrder(node.Parent, routingKey) : routingKey;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using RabbitMQ.Client.Events;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection
4+
{
5+
/// <summary>
6+
/// An interface of the service that contains logic of handling message consumption events and passing them to the message handlers.
7+
/// </summary>
8+
public interface IMessageHandlingService
9+
{
10+
/// <summary>
11+
/// Handle message consumption event.
12+
/// </summary>
13+
/// <param name="eventArgs">Arguments of message consumption event.</param>
14+
/// <param name="queueService">An instance of the queue service <see cref="IQueueService"/>.</param>
15+
void HandleMessage(BasicDeliverEventArgs @eventArgs, IQueueService queueService);
16+
}
17+
}

src/RabbitMQ.Client.Core.DependencyInjection/MessageHandlerRouter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace RabbitMQ.Client.Core.DependencyInjection
66
/// <summary>
77
/// Message handler router model.
88
/// </summary>
9-
internal class MessageHandlerRouter
9+
public class MessageHandlerRouter
1010
{
1111
/// <summary>
1212
/// Message Handler Type
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
using Microsoft.Extensions.Logging;
7+
using RabbitMQ.Client.Events;
8+
9+
namespace RabbitMQ.Client.Core.DependencyInjection
10+
{
11+
/// <summary>
12+
/// An implementation of the service that handles message consumption events.
13+
/// </summary>
14+
public class MessageHandlingService : IMessageHandlingService
15+
{
16+
const int ResendTimeout = 60;
17+
18+
readonly IEnumerable<RabbitMqExchange> _exchanges;
19+
readonly IDictionary<Type, List<string>> _messageHandlerRouters;
20+
readonly IDictionary<string, IList<IMessageHandler>> _messageHandlers;
21+
readonly IDictionary<string, IList<IAsyncMessageHandler>> _asyncMessageHandlers;
22+
readonly IDictionary<string, IList<INonCyclicMessageHandler>> _nonCyclicHandlers;
23+
readonly IDictionary<string, IList<IAsyncNonCyclicMessageHandler>> _asyncNonCyclicHandlers;
24+
readonly ILogger<MessageHandlingService> _logger;
25+
26+
public MessageHandlingService(
27+
IEnumerable<RabbitMqExchange> exchanges,
28+
IEnumerable<MessageHandlerRouter> routers,
29+
IEnumerable<IMessageHandler> messageHandlers,
30+
IEnumerable<IAsyncMessageHandler> asyncMessageHandlers,
31+
IEnumerable<INonCyclicMessageHandler> nonCyclicHandlers,
32+
IEnumerable<IAsyncNonCyclicMessageHandler> asyncNonCyclicHandlers,
33+
ILogger<MessageHandlingService> logger)
34+
{
35+
_exchanges = exchanges;
36+
_messageHandlerRouters = TransformMessageHandlerRouters(routers);
37+
_messageHandlers = TransformMessageHandlersCollection(messageHandlers);
38+
_asyncMessageHandlers = TransformMessageHandlersCollection(asyncMessageHandlers);
39+
_nonCyclicHandlers = TransformMessageHandlersCollection(nonCyclicHandlers);
40+
_asyncNonCyclicHandlers = TransformMessageHandlersCollection(asyncNonCyclicHandlers);
41+
_logger = logger;
42+
}
43+
44+
/// <summary>
45+
/// Handle message consumption event.
46+
/// </summary>
47+
/// <param name="eventArgs">Arguments of message consumption event.</param>
48+
/// <param name="queueService">An instance of the queue service <see cref="IQueueService"/>.</param>
49+
public void HandleMessage(BasicDeliverEventArgs eventArgs, IQueueService queueService)
50+
{
51+
var message = Encoding.UTF8.GetString(eventArgs.Body);
52+
53+
_logger.LogInformation($"New message was received with deliveryTag {eventArgs.DeliveryTag}.");
54+
_logger.LogInformation(message);
55+
56+
try
57+
{
58+
if (_asyncMessageHandlers.ContainsKey(eventArgs.RoutingKey))
59+
{
60+
var tasks = new List<Task>();
61+
foreach (var handler in _asyncMessageHandlers[eventArgs.RoutingKey])
62+
{
63+
tasks.Add(RunAsyncHandler(handler, message, eventArgs.RoutingKey));
64+
}
65+
Task.WaitAll(tasks.ToArray());
66+
}
67+
if (_messageHandlers.ContainsKey(eventArgs.RoutingKey))
68+
{
69+
foreach (var handler in _messageHandlers[eventArgs.RoutingKey])
70+
{
71+
_logger.LogDebug($"Starting processing the message by message handler {handler?.GetType().Name}.");
72+
handler.Handle(message, eventArgs.RoutingKey);
73+
_logger.LogDebug($"The message has been processed by message handler {handler?.GetType().Name}.");
74+
}
75+
}
76+
if (_asyncNonCyclicHandlers.ContainsKey(eventArgs.RoutingKey))
77+
{
78+
var tasks = new List<Task>();
79+
foreach (var handler in _asyncNonCyclicHandlers[eventArgs.RoutingKey])
80+
{
81+
tasks.Add(RunAsyncNonCyclicHandler(handler, message, eventArgs.RoutingKey, queueService));
82+
}
83+
Task.WaitAll(tasks.ToArray());
84+
}
85+
if (_nonCyclicHandlers.ContainsKey(eventArgs.RoutingKey))
86+
{
87+
foreach (var handler in _nonCyclicHandlers[eventArgs.RoutingKey])
88+
{
89+
_logger.LogDebug($"Starting processing the message by non-cyclic message handler {handler?.GetType().Name}.");
90+
handler.Handle(message, eventArgs.RoutingKey, queueService);
91+
_logger.LogDebug($"The message has been processed by non-cyclic message handler {handler?.GetType().Name}.");
92+
}
93+
}
94+
_logger.LogInformation($"Success message with deliveryTag {eventArgs.DeliveryTag}.");
95+
queueService.Channel.BasicAck(eventArgs.DeliveryTag, false);
96+
}
97+
catch (Exception exception)
98+
{
99+
_logger.LogError(new EventId(), exception, $"An error occurred while processing received message with delivery tag {eventArgs.DeliveryTag}.");
100+
101+
queueService.Channel.BasicAck(eventArgs.DeliveryTag, false);
102+
103+
if (eventArgs.BasicProperties.Headers is null)
104+
{
105+
eventArgs.BasicProperties.Headers = new Dictionary<string, object>();
106+
}
107+
108+
var exchange = _exchanges.FirstOrDefault(x => x.Name == eventArgs.Exchange);
109+
if (exchange is null)
110+
{
111+
_logger.LogError($"Could not detect exchange {eventArgs.Exchange} to detect the necessity of resending the failed message.");
112+
return;
113+
}
114+
115+
if (exchange.Options.RequeueFailedMessages
116+
&& !string.IsNullOrEmpty(exchange.Options.DeadLetterExchange)
117+
&& !eventArgs.BasicProperties.Headers.ContainsKey("requeued"))
118+
{
119+
eventArgs.BasicProperties.Headers.Add("requeued", true);
120+
queueService.Send(eventArgs.Body, eventArgs.BasicProperties, eventArgs.Exchange, eventArgs.RoutingKey, ResendTimeout);
121+
_logger.LogInformation("The failed message has been requeued.");
122+
}
123+
else
124+
{
125+
_logger.LogInformation("The failed message would not be requeued.");
126+
}
127+
}
128+
}
129+
130+
async Task RunAsyncHandler(IAsyncMessageHandler handler, string message, string routingKey)
131+
{
132+
_logger.LogDebug($"Starting processing the message by async message handler {handler?.GetType().Name}.");
133+
await handler.Handle(message, routingKey);
134+
_logger.LogDebug($"The message has been processed by async message handler {handler?.GetType().Name}.");
135+
}
136+
137+
async Task RunAsyncNonCyclicHandler(IAsyncNonCyclicMessageHandler handler, string message, string routingKey, IQueueService queueService)
138+
{
139+
_logger.LogDebug($"Starting processing the message by async non-cyclic message handler {handler?.GetType().Name}.");
140+
await handler.Handle(message, routingKey, queueService);
141+
_logger.LogDebug($"The message has been processed by async non-cyclic message handler {handler?.GetType().Name}.");
142+
}
143+
144+
IDictionary<Type, List<string>> TransformMessageHandlerRouters(IEnumerable<MessageHandlerRouter> routers)
145+
{
146+
var dictionary = new Dictionary<Type, List<string>>();
147+
foreach (var router in routers)
148+
{
149+
if (dictionary.ContainsKey(router.Type))
150+
{
151+
dictionary[router.Type] = dictionary[router.Type].Union(router.RoutingKeys).ToList();
152+
}
153+
else
154+
{
155+
dictionary.Add(router.Type, router.RoutingKeys);
156+
}
157+
}
158+
return dictionary;
159+
}
160+
161+
IDictionary<string, IList<T>> TransformMessageHandlersCollection<T>(IEnumerable<T> messageHandlers)
162+
{
163+
var dictionary = new Dictionary<string, IList<T>>();
164+
foreach (var handler in messageHandlers)
165+
{
166+
var type = handler.GetType();
167+
foreach (var routingKey in _messageHandlerRouters[type])
168+
{
169+
if (dictionary.ContainsKey(routingKey))
170+
{
171+
if (!dictionary[routingKey].Any(x => x.GetType() == handler.GetType()))
172+
{
173+
dictionary[routingKey].Add(handler);
174+
}
175+
}
176+
else
177+
{
178+
dictionary.Add(routingKey, new List<T> { handler });
179+
}
180+
}
181+
}
182+
return dictionary;
183+
}
184+
}
185+
}

0 commit comments

Comments
 (0)