Skip to content

Commit c87bf39

Browse files
committed
[Host.RabbitMq] RbbitMq server disconnection on unhandled routing keys messages
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 5748f5b commit c87bf39

File tree

5 files changed

+507
-8
lines changed

5 files changed

+507
-8
lines changed

docs/provider_rabbitmq.md

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
1010
- [Routing Keys and Wildcard Support](#routing-keys-and-wildcard-support)
1111
- [Basic Routing Keys](#basic-routing-keys)
1212
- [Wildcard Routing Keys](#wildcard-routing-keys)
13+
- [Unrecognized Routing Key Handler](#unrecognized-routing-key-handler)
1314
- [Acknowledgment Mode](#acknowledgment-mode)
1415
- [Consumer Error Handling](#consumer-error-handling)
1516
- [Dead Letter Exchange](#dead-letter-exchange)
@@ -20,6 +21,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
2021
- [Default Exchange](#default-exchange)
2122
- [Why it exists](#why-it-exists)
2223
- [Connection Resiliency](#connection-resiliency)
24+
- [Consumer Recovery](#consumer-recovery)
2325
- [Recipes](#recipes)
2426
- [01 Multiple consumers on the same queue with different concurrency](#01-multiple-consumers-on-the-same-queue-with-different-concurrency)
2527
- [Feedback](#feedback)
@@ -212,18 +214,124 @@ services.AddSlimMessageBus(mbb =>
212214

213215
**Routing Key Pattern Examples:**
214216

215-
| Pattern | Matches | Doesn't Match |
216-
|---------|---------|---------------|
217-
| `regions.na.cities.*` | `regions.na.cities.toronto`<br/>`regions.na.cities.newyork` | `regions.na.cities` (missing segment)<br/>`regions.na.cities.toronto.downtown` (extra segment) |
218-
| `audit.events.#` | `audit.events.users.signup`<br/>`audit.events.orders.placed`<br/>`audit.events` | `audit.users` (wrong prefix) |
219-
| `orders.#.region.*` | `orders.processed.region.na`<br/>`orders.created.cancelled.region.eu`<br/>`orders.region.na` | `orders.processed.state.california` (wrong pattern)<br/>`orders.processed.region` (missing final segment) |
220-
| `#` | Any routing key | None (matches everything) |
217+
| Pattern | Matches | Doesn't Match |
218+
| --------------------- | -------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
219+
| `regions.na.cities.*` | `regions.na.cities.toronto`<br/>`regions.na.cities.newyork` | `regions.na.cities` (missing segment)<br/>`regions.na.cities.toronto.downtown` (extra segment) |
220+
| `audit.events.#` | `audit.events.users.signup`<br/>`audit.events.orders.placed`<br/>`audit.events` | `audit.users` (wrong prefix) |
221+
| `orders.#.region.*` | `orders.processed.region.na`<br/>`orders.created.cancelled.region.eu`<br/>`orders.region.na` | `orders.processed.state.california` (wrong pattern)<br/>`orders.processed.region` (missing final segment) |
222+
| `#` | Any routing key | None (matches everything) |
221223

222224
**Performance Note:** SlimMessageBus optimizes routing key matching by:
225+
223226
- Using exact matches first for better performance
224227
- Only applying wildcard pattern matching when no exact match is found
225228
- Caching routing key patterns for efficient lookup
226229

230+
##### Unrecognized Routing Key Handler
231+
232+
When a message arrives on a queue with a routing key that doesn't match any of the configured consumer routing key patterns, the `MessageUnrecognizedRoutingKeyHandler` is invoked to determine how the message should be handled.
233+
234+
**Default Behavior:**
235+
236+
By default, unrecognized messages are **acknowledged (Ack)** and removed from the queue:
237+
238+
```cs
239+
// Default behavior (built-in)
240+
settings.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => RabbitMqMessageConfirmOptions.Ack;
241+
```
242+
243+
This default is appropriate when:
244+
245+
- Routing key mismatches are expected and acceptable
246+
- You want to silently discard messages that don't match any consumer
247+
- Your application follows a "fail-open" approach for unknown messages
248+
249+
**Customizing the Handler:**
250+
251+
You can customize this behavior at the bus level to handle unrecognized messages differently:
252+
253+
```cs
254+
services.AddSlimMessageBus(mbb =>
255+
{
256+
mbb.WithProviderRabbitMQ(cfg =>
257+
{
258+
// Option 1: Reject unrecognized messages without requeue (send to DLX if configured)
259+
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
260+
RabbitMqMessageConfirmOptions.Nack;
261+
262+
// Option 2: Reject and requeue for later processing
263+
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
264+
RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue;
265+
266+
// Option 3: Custom logic based on routing key or message content
267+
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
268+
{
269+
var routingKey = transportMessage.RoutingKey;
270+
271+
// Log for monitoring and alerting
272+
logger.LogWarning("Unrecognized routing key: {RoutingKey} on exchange: {Exchange}",
273+
routingKey, transportMessage.Exchange);
274+
275+
// Route to DLX if it looks like a potential typo or misconfiguration
276+
if (routingKey.StartsWith("orders."))
277+
{
278+
return RabbitMqMessageConfirmOptions.Nack; // Send to DLX for investigation
279+
}
280+
281+
// Ack and discard messages from other exchanges
282+
return RabbitMqMessageConfirmOptions.Ack;
283+
};
284+
});
285+
});
286+
```
287+
288+
**Available Options:**
289+
290+
- **`Ack` (default)**: Acknowledge and remove the message from the queue - use when unrecognized messages should be silently discarded
291+
- **`Nack`**: Reject the message and route to Dead Letter Exchange (if configured) - use for debugging routing issues or when messages shouldn't be lost
292+
- **`Nack | Requeue`**: Reject and requeue the message for retry - use when routing keys might be registered dynamically or during rolling deployments
293+
294+
**Example with Dead Letter Exchange:**
295+
296+
```cs
297+
services.AddSlimMessageBus(mbb =>
298+
{
299+
mbb.WithProviderRabbitMQ(cfg =>
300+
{
301+
// Send unrecognized messages to DLX for investigation
302+
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
303+
RabbitMqMessageConfirmOptions.Nack;
304+
});
305+
306+
mbb.Consume<OrderEvent>(x => x
307+
.Queue("orders-queue")
308+
.ExchangeBinding("orders", routingKey: "orders.created")
309+
// Unrecognized messages will be routed to this DLX
310+
.DeadLetterExchange("orders-dlq", exchangeType: ExchangeType.Direct)
311+
.WithConsumer<OrderCreatedConsumer>());
312+
});
313+
```
314+
315+
**Handler Parameters:**
316+
317+
The handler receives `BasicDeliverEventArgs` which provides access to:
318+
319+
- `RoutingKey`: The routing key of the unrecognized message
320+
- `Exchange`: The exchange the message was published to
321+
- `Body`: The message payload (ReadOnlyMemory<byte>)
322+
- `BasicProperties`: Message properties including headers, MessageId, ContentType, etc.
323+
324+
This allows for sophisticated routing decisions based on message metadata.
325+
326+
**Common Use Cases:**
327+
328+
1. **Development/Debugging**: Use `Nack` to route unrecognized messages to a DLX where they can be inspected
329+
2. **Production Monitoring**: Log unrecognized routing keys and send metrics to your monitoring system
330+
3. **Graceful Degradation**: Use `Ack` in production to prevent queue buildup from deprecated message types
331+
4. **Rolling Deployments**: Use `Nack | Requeue` to temporarily requeue messages during deployments when consumers might not yet be ready
332+
333+
**Note:** This handler only applies to messages where the routing key doesn't match any configured consumer patterns. Messages that match a routing key pattern but fail during processing are handled by the [Consumer Error Handling](#consumer-error-handling) mechanism instead.
334+
227335
#### Acknowledgment Mode
228336

229337
When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB:
@@ -487,6 +595,7 @@ When a RabbitMQ server restarts or the connection is lost, SlimMessageBus automa
487595
5. Resumes message processing automatically
488596

489597
This ensures that:
598+
490599
- Messages don't pile up in queues during temporary outages
491600
- Consumers are visible in the RabbitMQ management UI after recovery
492601
- No manual intervention is required to restore message processing

src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,13 @@
3030
/// </summary>
3131
/// <param name="option"></param>
3232
public delegate void RabbitMqMessageConfirmAction(RabbitMqMessageConfirmOptions option);
33+
34+
/// <summary>
35+
/// Represents the method that handles a RabbitMQ message that includes an routing key that is obsolete or non relevent from the applicatin perspective
36+
/// Provides access to the message, its properties, and a confirmation action.
37+
/// </summary>
38+
/// <remarks>Use this delegate to process messages received from RabbitMQ queues where the routing key is not
39+
/// relevant or not provided. The handler is responsible for invoking the confirmation action to ensure proper message
40+
/// acknowledgment.</remarks>
41+
/// <param name="transportMessage">The event arguments containing the delivered RabbitMQ message and related metadata.</param>
42+
public delegate RabbitMqMessageConfirmOptions RabbitMqMessageUnrecognizedRoutingKeyHandler(BasicDeliverEventArgs transportMessage);

src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer
1616
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
1717
private readonly RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>> _routingKeyMatcher;
1818

19+
private readonly RabbitMqMessageUnrecognizedRoutingKeyHandler _messageUnrecognizedRoutingKeyHandler;
20+
1921
protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;
2022

2123
public RabbitMqConsumer(
2224
ILoggerFactory loggerFactory,
2325
IRabbitMqChannel channel,
2426
string queueName,
2527
IList<ConsumerSettings> consumers,
26-
MessageBusBase messageBus,
28+
MessageBusBase<RabbitMqMessageBusSettings> messageBus,
2729
MessageProvider<BasicDeliverEventArgs> messageProvider,
2830
IHeaderValueConverter headerValueConverter)
2931
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(),
@@ -33,6 +35,7 @@ public RabbitMqConsumer(
3335
queueName,
3436
headerValueConverter)
3537
{
38+
_messageUnrecognizedRoutingKeyHandler = messageBus.ProviderSettings.MessageUnrecognizedRoutingKeyHandler;
3639
_acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
3740
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
3841

@@ -102,7 +105,7 @@ protected override async Task OnStop()
102105
await base.OnStop();
103106
}
104107

105-
private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
108+
internal void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
106109
{
107110
if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)
108111
{
@@ -164,6 +167,8 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
164167
else
165168
{
166169
Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, Path, transportMessage.RoutingKey);
170+
var confirmAction = _messageUnrecognizedRoutingKeyHandler(transportMessage);
171+
ConfirmMessage(transportMessage, confirmAction, messageHeaders);
167172
}
168173

169174
// error handling happens in the message processor

src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettings.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,11 @@ public string ConnectionString
2525
/// See the <see cref="DefaultHeaderValueConverter"/>.
2626
/// </summary>
2727
public IHeaderValueConverter HeaderValueConverter { get; set; } = new DefaultHeaderValueConverter();
28+
29+
/// <summary>
30+
/// Allows to handle messages that arrive with an unrecognized routing key and decide what to do with them.
31+
/// By default the message is Acknowledged.
32+
/// </summary>
33+
public RabbitMqMessageUnrecognizedRoutingKeyHandler MessageUnrecognizedRoutingKeyHandler { get; set; } = (_) => RabbitMqMessageConfirmOptions.Ack;
2834
}
2935

0 commit comments

Comments
 (0)