Skip to content

Commit 7154414

Browse files
committed
[Host.RabbitMq] RbbitMq server disconnection on unhandled routing keys messages
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 5748f5b commit 7154414

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,13 @@
3030
/// </summary>
3131
/// <param name="option"></param>
3232
public delegate void RabbitMqMessageConfirmAction(RabbitMqMessageConfirmOptions option);
33+
34+
/// <summary>
35+
/// Represents the method that handles a RabbitMQ message that includes an routing key that is obsolete or non relevent from the applicatin perspective
36+
/// Provides access to the message, its properties, and a confirmation action.
37+
/// </summary>
38+
/// <remarks>Use this delegate to process messages received from RabbitMQ queues where the routing key is not
39+
/// relevant or not provided. The handler is responsible for invoking the confirmation action to ensure proper message
40+
/// acknowledgment.</remarks>
41+
/// <param name="transportMessage">The event arguments containing the delivered RabbitMQ message and related metadata.</param>
42+
public delegate RabbitMqMessageConfirmOptions RabbitMqMessageUnrecognizedRoutingKeyHandler(BasicDeliverEventArgs transportMessage);

src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer
1616
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
1717
private readonly RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>> _routingKeyMatcher;
1818

19+
private readonly RabbitMqMessageUnrecognizedRoutingKeyHandler _messageUnrecognizedRoutingKeyHandler;
20+
1921
protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;
2022

2123
public RabbitMqConsumer(
2224
ILoggerFactory loggerFactory,
2325
IRabbitMqChannel channel,
2426
string queueName,
2527
IList<ConsumerSettings> consumers,
26-
MessageBusBase messageBus,
28+
MessageBusBase<RabbitMqMessageBusSettings> messageBus,
2729
MessageProvider<BasicDeliverEventArgs> messageProvider,
2830
IHeaderValueConverter headerValueConverter)
2931
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(),
@@ -33,6 +35,7 @@ public RabbitMqConsumer(
3335
queueName,
3436
headerValueConverter)
3537
{
38+
_messageUnrecognizedRoutingKeyHandler = messageBus.ProviderSettings.MessageUnrecognizedRoutingKeyHandler;
3639
_acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
3740
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
3841

@@ -164,6 +167,8 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
164167
else
165168
{
166169
Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, Path, transportMessage.RoutingKey);
170+
var confirmAction = _messageUnrecognizedRoutingKeyHandler(transportMessage);
171+
ConfirmMessage(transportMessage, confirmAction, messageHeaders);
167172
}
168173

169174
// error handling happens in the message processor

src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettings.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,11 @@ public string ConnectionString
2525
/// See the <see cref="DefaultHeaderValueConverter"/>.
2626
/// </summary>
2727
public IHeaderValueConverter HeaderValueConverter { get; set; } = new DefaultHeaderValueConverter();
28+
29+
/// <summary>
30+
/// Allows to handle messages that arrive with an unrecognized routing key and decide what to do with them.
31+
/// By default the message is Acknowledged.
32+
/// </summary>
33+
public RabbitMqMessageUnrecognizedRoutingKeyHandler MessageUnrecognizedRoutingKeyHandler { get; set; } = (_) => RabbitMqMessageConfirmOptions.Ack;
2834
}
2935

0 commit comments

Comments
 (0)