Skip to content

Commit 806e9bd

Browse files
t11omaszarusz
authored andcommitted
Adds message header filtering for consumers
Enables advanced routing and dispatching of messages to specific consumers or invokers based on message headers. This allows multiple logical consumers to share the same transport path (topic/queue). - Introduces new `.Filter()` methods on the consumer builder for configuration. - Implements runtime evaluation of filter predicates during invoker selection, supporting both consumer-level and per-invoker filters with clear precedence. - Adds robust error handling for filter predicates, treating exceptions as non-matches and logging warnings. - Updates documentation with a dedicated section, API details, examples, and troubleshooting. - Includes a unit test to verify correct message dispatching with header-based filters. Adds consumer message filtering by headers Introduces the ability to filter incoming messages for specific consumers or invokers based on message headers. This enables advanced routing scenarios where multiple logical consumers share the same transport topic or queue. * Allows configuring filter predicates via the `.Filter()` method on the consumer builder. * Filters are evaluated at runtime during invoker selection, supporting both consumer-level and per-invoker predicates with clear precedence. * Includes robust error handling for filter predicates, treating exceptions as non-matches and logging warnings. * Updates the documentation with a dedicated section, API details, examples, and troubleshooting. * Adds a unit test to ensure correct message dispatching with header-based filters. Signed-off-by: Thomas Anderson <t11omas1983@live.co.uk> Enhances consumer matching with message context The `TryMatchConsumerInvoker` method now includes `messageHeaders` and the raw `transportMessage` as parameters. This allows for more dynamic and context-aware selection of message consumers, enabling routing decisions based on runtime header information or transport-specific details, rather than solely on message type. Signed-off-by: Thomas Anderson <t11omas1983@live.co.uk> Adjusts test filter signature Aligns the message consumer filter delegate in tests with the updated API that now provides access to the message object alongside headers. Ensures compatibility with the new filter signature. Signed-off-by: Thomas Anderson <t11omas1983@live.co.uk> Refactors consumer filter delegate type Introduces a new generic `ConsumerFilter` delegate to provide a more explicit and type-safe contract for filtering messages based on headers and the transport message. Updates `ConsumerSettings` and related interfaces to use this new delegate type for the `Filter` property, improving consistency and clarity across the configuration. Removes a less specific `Filter` builder overload from `AbstractConsumerBuilder` to streamline the API towards the more comprehensive `(headers, message)` filter signature, which now leverages the new delegate. Adjusts filter selection logic in `MessageProcessor`. If a specific invoker has a filter configured, that filter is now bypassed, and no fallback to a parent consumer filter occurs. If the invoker has no specific filter, it defaults to the parent consumer's filter. Signed-off-by: Thomas Andoerson <t11omas1983@live.co.uk> Fixed test refactored message processor as per PR comments to reduce the memory allocation on the single invoker scenario Signed-off-by: Thomas Andoerson <t11omas1983@live.co.uk>
1 parent 1ed24d9 commit 806e9bd

File tree

10 files changed

+361
-8
lines changed

10 files changed

+361
-8
lines changed

docs/intro.md

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
2+
13
# Introduction to SlimMessageBus <!-- omit in toc -->
24

35
- [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus)
@@ -9,6 +11,7 @@
911
- [Consumer](#consumer)
1012
- [Start or Stop message consumption](#start-or-stop-message-consumption)
1113
- [Health check circuit breaker](#health-check-circuit-breaker)
14+
- [Consumer filtering](#consumer-filtering)
1215
- [Consumer Context (Accessing Additional Message Information)](#consumer-context-accessing-additional-message-information)
1316
- [Recommended Approach: Constructor Injection](#recommended-approach-constructor-injection)
1417
- [Obsolete Approaches](#obsolete-approaches)
@@ -350,6 +353,110 @@ Here’s a clearer, more structured rewrite of that section with improved readab
350353

351354
---
352355

356+
#### Consumer filtering
357+
358+
359+
##### Consumer filtering by message headers
360+
361+
SlimMessageBus supports registering predicates that filter which consumer invokers should handle a given incoming message based on message headers (and optionally the transport message). This allows multiple consumers to subscribe to the same topic/queue and be selected at runtime using header values (for example `ResourceType`, `TenantId`, `MessageVersion`, etc).
362+
363+
This document describes:
364+
- Where to configure filters (consumer-level and per-invoker).
365+
- How filters are evaluated at runtime.
366+
- Examples.
367+
368+
##### Motivation
369+
370+
When multiple logical message handlers share the same transport path (same queue or topic) it is useful to dispatch to a specific handler based on message metadata carried in headers. Filters are lightweight predicates executed during invoker selection and allow routing decisions without touching the transport topology.
371+
372+
##### API
373+
374+
Per-consumer filter (applies only to that derived-consumer). Use the `Filter` overload that accepts a predicate:
375+
376+
```csharp
377+
bus.Consume<MyMessage>(...
378+
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
379+
.WithConsumer<MyMessageAConsumer2>());
380+
```
381+
382+
383+
384+
##### Runtime behaviour and precedence
385+
386+
- When a message arrives, consumers are matched by message CLR type (as before).
387+
- For each candidate consumer the runtime evaluates the predicate in this order:
388+
1. `consumer.Filter` (per-consumer filter), if present.
389+
2. Otherwise `consumer.ParentSettings.Filter` (consumer-level filter), if present.
390+
- If no filter is present the invoker is considered a match as before.
391+
- If a filter throws an exception it is treated as a non-match and a warning log entry is emitted.
392+
- If none of the invokers match and the consumer configuration says to fail on unrecognized type, the message is handled according to existing undeclared-message rules (log / throw / ack etc).
393+
394+
This approach allows a default consumer-level filter and selective overrides per-invoker.
395+
396+
##### Example: two consumers on same queue filtered by header
397+
398+
Producer (publishes with header):
399+
```csharp
400+
public class SomeMessage
401+
{
402+
public string ResourceType { get; set; }
403+
// other properties
404+
}
405+
406+
await bus.Publish(new SomeMessage { ResourceType = "SomeMessageA" },
407+
ctx => ctx.SetHeader("ResourceType", "SomeMessageA"));
408+
```
409+
410+
Consumer registration:
411+
```csharp
412+
public class SomeMessageAConsumer : IConsumer<SomeMessage>
413+
{
414+
public Task Consume(SomeMessage message)
415+
{
416+
// handle SomeMessageA
417+
}
418+
}
419+
420+
public class SomeMessageBConsumer : IConsumer<SomeMessage>
421+
{
422+
public Task Consume(SomeMessage message)
423+
{
424+
// handle SomeMessageB
425+
}
426+
}
427+
428+
bus.Consume<SomeMessageA>(x => x.Topic(topic)
429+
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageA))
430+
.WithConsumer<SomeMessageAConsumer>());
431+
432+
bus.Consume<SomeMessageA>(x => x.Topic(topic)
433+
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
434+
.WithConsumer<SomeMessageAConsumer2>());
435+
```
436+
437+
438+
If the incoming message has `ResourceType = "SomeMessageA"` only the first consumer will be executed.
439+
440+
##### Notes and recommendations
441+
442+
- Keep filter predicates cheap and deterministicthey are executed during invoker selection.
443+
- Prefer header-only predicates for portability across providers. Use the transport message parameter only when you need provider-specific metadata.
444+
- Consider logging or metrics when an arrival does not match any consumer so routing configuration problems are visible.
445+
- Unit tests: there is (or should be) a unit test demonstrating two consumers on the same topic filtered by header under `Tests\SlimMessageBus.Host.Memory.Test\MemoryMessageBusTests.cs` (see `When_Publish_Given_TwoConsumersOnSameTopic_WithDifferentFilters_Then_OnlyMatchingConsumerInvoked`).
446+
447+
##### Troubleshooting
448+
449+
- If you expect a consumer to be invoked but it is not:
450+
- Verify the header key and value types (strings vs other types).
451+
- Verify whether the filter throws an exception (exceptions are treated as non-matches and logged).
452+
- Confirm whether consumer-level filter is presentit will be used only if per-invoker filter is absent.
453+
454+
##### Summary
455+
456+
Filters let you route messages to specific invokers based on headers or transport-level metadata while keeping a single transport path for related messages. They are configured in the fluent builder and evaluated at runtime during invoker selection.
457+
458+
---
459+
353460
### Consumer Context (Accessing Additional Message Information)
354461

355462
Within a message consumer, you can access the [`IConsumerContext`](/src/SlimMessageBus/IConsumerContext.cs) to retrieve detailed metadata about the message being processed. This includes:
@@ -496,6 +603,8 @@ Each processing of a message resolves the `TConsumer` instance from the DI.
496603

497604
> Please note that anything higher than 1 will cause multiple messages to be consumed concurrently in one service instance. This will typically impact message processing order (ie 2nd message might get processed sooner than the 1st message).
498605
606+
#### Filtering
607+
499608
## Request-response communication
500609

501610
SMB provides an implementation of request-response over topics or queues - depending on what the underlying provider supports.

docs/intro.t.md

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
2+
13
# Introduction to SlimMessageBus <!-- omit in toc -->
24

35
- [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus)
@@ -9,6 +11,7 @@
911
- [Consumer](#consumer)
1012
- [Start or Stop message consumption](#start-or-stop-message-consumption)
1113
- [Health check circuit breaker](#health-check-circuit-breaker)
14+
- [Consumer filtering](#consumer-filtering)
1215
- [Consumer Context (Accessing Additional Message Information)](#consumer-context-accessing-additional-message-information)
1316
- [Recommended Approach: Constructor Injection](#recommended-approach-constructor-injection)
1417
- [Obsolete Approaches](#obsolete-approaches)
@@ -350,6 +353,110 @@ Here’s a clearer, more structured rewrite of that section with improved readab
350353

351354
---
352355

356+
#### Consumer filtering
357+
358+
359+
##### Consumer filtering by message headers
360+
361+
SlimMessageBus supports registering predicates that filter which consumer invokers should handle a given incoming message based on message headers (and optionally the transport message). This allows multiple consumers to subscribe to the same topic/queue and be selected at runtime using header values (for example `ResourceType`, `TenantId`, `MessageVersion`, etc).
362+
363+
This document describes:
364+
- Where to configure filters (consumer-level and per-invoker).
365+
- How filters are evaluated at runtime.
366+
- Examples.
367+
368+
##### Motivation
369+
370+
When multiple logical message handlers share the same transport path (same queue or topic) it is useful to dispatch to a specific handler based on message metadata carried in headers. Filters are lightweight predicates executed during invoker selection and allow routing decisions without touching the transport topology.
371+
372+
##### API
373+
374+
Per-consumer filter (applies only to that derived-consumer). Use the `Filter` overload that accepts a predicate:
375+
376+
```csharp
377+
bus.Consume<MyMessage>(...
378+
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
379+
.WithConsumer<MyMessageAConsumer2>());
380+
```
381+
382+
383+
384+
##### Runtime behaviour and precedence
385+
386+
- When a message arrives, consumers are matched by message CLR type (as before).
387+
- For each candidate consumer the runtime evaluates the predicate in this order:
388+
1. `consumer.Filter` (per-consumer filter), if present.
389+
2. Otherwise `consumer.ParentSettings.Filter` (consumer-level filter), if present.
390+
- If no filter is present the invoker is considered a match as before.
391+
- If a filter throws an exception it is treated as a non-match and a warning log entry is emitted.
392+
- If none of the invokers match and the consumer configuration says to fail on unrecognized type, the message is handled according to existing undeclared-message rules (log / throw / ack etc).
393+
394+
This approach allows a default consumer-level filter and selective overrides per-invoker.
395+
396+
##### Example: two consumers on same queue filtered by header
397+
398+
Producer (publishes with header):
399+
```csharp
400+
public class SomeMessage
401+
{
402+
public string ResourceType { get; set; }
403+
// other properties
404+
}
405+
406+
await bus.Publish(new SomeMessage { ResourceType = "SomeMessageA" },
407+
ctx => ctx.SetHeader("ResourceType", "SomeMessageA"));
408+
```
409+
410+
Consumer registration:
411+
```csharp
412+
public class SomeMessageAConsumer : IConsumer<SomeMessage>
413+
{
414+
public Task Consume(SomeMessage message)
415+
{
416+
// handle SomeMessageA
417+
}
418+
}
419+
420+
public class SomeMessageBConsumer : IConsumer<SomeMessage>
421+
{
422+
public Task Consume(SomeMessage message)
423+
{
424+
// handle SomeMessageB
425+
}
426+
}
427+
428+
bus.Consume<SomeMessageA>(x => x.Topic(topic)
429+
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageA))
430+
.WithConsumer<SomeMessageAConsumer>());
431+
432+
bus.Consume<SomeMessageA>(x => x.Topic(topic)
433+
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
434+
.WithConsumer<SomeMessageAConsumer2>());
435+
```
436+
437+
438+
If the incoming message has `ResourceType = "SomeMessageA"` only the first consumer will be executed.
439+
440+
##### Notes and recommendations
441+
442+
- Keep filter predicates cheap and deterministicthey are executed during invoker selection.
443+
- Prefer header-only predicates for portability across providers. Use the transport message parameter only when you need provider-specific metadata.
444+
- Consider logging or metrics when an arrival does not match any consumer so routing configuration problems are visible.
445+
- Unit tests: there is (or should be) a unit test demonstrating two consumers on the same topic filtered by header under `Tests\SlimMessageBus.Host.Memory.Test\MemoryMessageBusTests.cs` (see `When_Publish_Given_TwoConsumersOnSameTopic_WithDifferentFilters_Then_OnlyMatchingConsumerInvoked`).
446+
447+
##### Troubleshooting
448+
449+
- If you expect a consumer to be invoked but it is not:
450+
- Verify the header key and value types (strings vs other types).
451+
- Verify whether the filter throws an exception (exceptions are treated as non-matches and logged).
452+
- Confirm whether consumer-level filter is presentit will be used only if per-invoker filter is absent.
453+
454+
##### Summary
455+
456+
Filters let you route messages to specific invokers based on headers or transport-level metadata while keeping a single transport path for related messages. They are configured in the fluent builder and evaluated at runtime during invoker selection.
457+
458+
---
459+
353460
### Consumer Context (Accessing Additional Message Information)
354461

355462
Within a message consumer, you can access the [`IConsumerContext`](/src/SlimMessageBus/IConsumerContext.cs) to retrieve detailed metadata about the message being processed. This includes:
@@ -496,6 +603,8 @@ Each processing of a message resolves the `TConsumer` instance from the DI.
496603

497604
> Please note that anything higher than 1 will cause multiple messages to be consumed concurrently in one service instance. This will typically impact message processing order (ie 2nd message might get processed sooner than the 1st message).
498605
606+
#### Filtering
607+
499608
## Request-response communication
500609

501610
SMB provides an implementation of request-response over topics or queues - depending on what the underlying provider supports.
@@ -1243,7 +1352,29 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag
12431352

12441353
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.
12451354

1246-
@[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface)
1355+
```cs
1356+
public interface IConsumerErrorHandler<in T>
1357+
{
1358+
/// <summary>
1359+
/// <para>
1360+
/// Executed when the message consumer (or handler) errors out. The interface allows for interception of
1361+
/// exceptions to manipulate the processing pipeline (success/fail/retry).
1362+
/// </para>
1363+
/// <para>
1364+
/// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc).
1365+
/// </para>
1366+
/// <para>
1367+
/// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns.
1368+
/// </para>
1369+
/// </summary>
1370+
/// <param name="message">The message that failed to process.</param>
1371+
/// <param name="consumerContext">The consumer context for the message processing pipeline.</param>
1372+
/// <param name="exception">Exception that occurred during message processing.</param>
1373+
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
1374+
/// <returns>The error handling result.</returns>
1375+
Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
1376+
}
1377+
```
12471378

12481379
The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options.
12491380

@@ -1420,4 +1551,4 @@ This allows to recreate missing elements in the infrastructure without restartin
14201551
## Versions
14211552

14221553
- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/3.0.0).
1423-
- The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0).
1554+
- The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0).

src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,14 @@ public TConsumerBuilder WhenUndeclaredMessageTypeArrives(Action<UndeclaredMessag
114114
action(ConsumerSettings.UndeclaredMessageType);
115115
return (TConsumerBuilder)this;
116116
}
117+
118+
/// <summary>
119+
/// More advanced overload where transport message is passed as well.
120+
/// </summary>
121+
public TConsumerBuilder Filter(ConsumerFilter<object> headerPredicateWithTransport)
122+
{
123+
if (headerPredicateWithTransport == null) throw new ArgumentNullException(nameof(headerPredicateWithTransport));
124+
ConsumerSettings.Filter = headerPredicateWithTransport;
125+
return (TConsumerBuilder)this;
126+
}
117127
}

src/SlimMessageBus.Host.Configuration/Settings/ConsumerSettings.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,9 @@ private void CalculateResponseType()
5050
/// Enables the disposal of consumer instance after the message has been consumed.
5151
/// </summary>
5252
public bool IsDisposeConsumerEnabled { get; set; }
53+
54+
/// <summary>
55+
/// Optional predicate evaluated on arrival headers and transport message to decide if this invoker should be used.
56+
/// </summary>
57+
public ConsumerFilter<object> Filter { get; set; }
5358
}

src/SlimMessageBus.Host.Configuration/Settings/Delegates.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@
22

33
public delegate void MessageHeaderModifier<in T>(IDictionary<string, object> headers, T message);
44

5+
public delegate bool ConsumerFilter<in T>(IReadOnlyDictionary<string, object> headers, T message);
6+
57
public delegate Task ConsumerMethod(object consumer, object message, IConsumerContext consumerContext, CancellationToken cancellationToken);

src/SlimMessageBus.Host.Configuration/Settings/IMessageTypeConsumerInvokerSettings.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@ public interface IMessageTypeConsumerInvokerSettings
1919
/// The consumer method.
2020
/// </summary>
2121
MethodInfo ConsumerMethodInfo { get; set; }
22+
23+
/// <summary>
24+
/// Optional predicate to filter arriving messages by headers/transport message.
25+
/// When set, the invoker is only considered if the predicate returns true.
26+
/// </summary>
27+
ConsumerFilter<object> Filter { get; set; }
2228
}

src/SlimMessageBus.Host.Configuration/Settings/MessageTypeConsumerInvokerSettings.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public class MessageTypeConsumerInvokerSettings : IMessageTypeConsumerInvokerSet
1414
public ConsumerMethod ConsumerMethod { get; set; }
1515
/// <inheritdoc/>
1616
public MethodInfo ConsumerMethodInfo { get; set; }
17+
/// <inheritdoc/>
18+
public ConsumerFilter<object> Filter { get; set; }
1719

1820
public MessageTypeConsumerInvokerSettings(ConsumerSettings parentSettings, Type messageType, Type consumerType)
1921
{

0 commit comments

Comments
 (0)