Skip to content

Commit d04154f

Browse files
EtherZazarusz
authored andcommitted
#356 Consumer error response per transport
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
1 parent 15834b6 commit d04154f

File tree

32 files changed

+188
-247
lines changed

32 files changed

+188
-247
lines changed

docs/intro.md

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
- [Order of Execution](#order-of-execution)
3939
- [Generic interceptors](#generic-interceptors)
4040
- [Error Handling](#error-handling)
41+
- [Azure Service Bus](#azure-service-bus)
42+
- [RabbitMQ](#rabbitmq)
4143
- [Logging](#logging)
4244
- [Debugging](#debugging)
4345
- [Provider specific functionality](#provider-specific-functionality)
@@ -1081,15 +1083,14 @@ public interface IConsumerErrorHandler<in T>
10811083
/// <param name="exception">Exception that occurred during message processing.</param>
10821084
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
10831085
/// <returns>The error handling result.</returns>
1084-
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
1086+
Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
10851087
}
10861088
```
10871089

1088-
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
1090+
The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options.
10891091

1090-
| Result | Description |
1092+
| ProcessResult | Description |
10911093
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
1092-
| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** |
10931094
| Failure | The message failed to be processed and should be returned to the queue |
10941095
| Success | The pipeline must treat the message as having been processed successfully |
10951096
| SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
@@ -1105,70 +1106,59 @@ services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabb
11051106
services.AddTransient(typeof(IConsumerErrorHandler<>), typeof(CustomConsumerErrorHandler<>));
11061107
```
11071108

1108-
Transport plugins provide specialized error handling interfaces. Examples include:
1109+
Transport plugins provide specialized error handling interfaces with a default implementation that includes any additional `ProcessResult` options. Examples include:
11091110

1110-
- [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs)
1111-
- [IRabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs)
1112-
- [IKafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs)
1113-
- [IRedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs)
1114-
- [INatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs)
1115-
- [IServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs)
1116-
- [IEventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs)
1117-
- [ISqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs)
1111+
| Interface | Implementation including reference to additional options (if any) |
1112+
| ---------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
1113+
| [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | [MemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) |
1114+
| [IRabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | [RabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) |
1115+
| [IKafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | [KafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) |
1116+
| [IRedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | [RedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) |
1117+
| [INatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | [NatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) |
1118+
| [IServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | [ServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) |
1119+
| [IEventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | [EventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) |
1120+
| [ISqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | [SqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) |
11181121

11191122
> The message processing pipeline will always attempt to use the transport-specific error handler (e.g., `IMemoryConsumerErrorHandler<T>`) first. If unavailable, it will then look for the generic error handler (`IConsumerErrorHandler<T>`).
11201123

11211124
This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.
11221125

1123-
1124-
### Abandon
11251126
#### Azure Service Bus
1126-
The Azure Service Bus transport has full support for abandoning messages to the dead letter queue.
1127-
1128-
#### RabbitMQ
1129-
Abandon will issue a `Nack` with `requeue: false`.
1130-
1131-
#### Other transports
1132-
No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown.
1127+
| ProcessResult | Description |
1128+
| ------------- | ---------------------------------------------------------------------------------- |
1129+
| DeadLetter | Abandons further processing of the message by sending it to the dead letter queue. |
11331130

1134-
### Failure
11351131
#### RabbitMQ
1136-
While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler.
1132+
| ProcessResult | Description |
1133+
| ------------- | --------------------------------------------------------------- |
1134+
| Requeue | Return the message to the queue for re-processing <sup>1</sup>. |
11371135

1138-
Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions.
1139-
1140-
```cs
1141-
.Handle<EchoRequest, EchoResponse>(x => x
1142-
.Queue("echo-request-handler")
1143-
.ExchangeBinding("test-echo")
1144-
.DeadLetterExchange("echo-request-handler-dlq")
1145-
// requeue a message on failure
1146-
.RequeueOnFailure()
1147-
.WithHandler<EchoRequestHandler>())
1148-
```
1136+
<sup>1</sup> RabbitMQ does not have a maximum delivery count. Please use `Requeue` with caution as, if no other conditions are applied, it may result in an infinite message loop.
11491137

1150-
### Example usage
1151-
Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):
1138+
Example retry with exponential back-off and short-curcuit to dead letter exchange on non-transient exceptions (using the [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) abstract implementation):
11521139
```cs
1153-
public class RetryHandler<T> : ConsumerErrorHandler<T>
1140+
public class RetryHandler<T> : RabbitMqConsumerErrorHandler<T>
11541141
{
11551142
private static readonly Random _random = new();
11561143

11571144
public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
11581145
{
11591146
if (!IsTranientException(exception))
11601147
{
1161-
return Abandon();
1148+
return Failure();
11621149
}
11631150

11641151
if (attempts < 3)
11651152
{
11661153
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
11671154
await Task.Delay(delay, consumerContext.CancellationToken);
1155+
1156+
// in process retry
11681157
return Retry();
11691158
}
11701159

1171-
return Failure();
1160+
// re-qeuue for out of process execution
1161+
return Requeue();
11721162
}
11731163

11741164
private static bool IsTransientException(Exception exception)

0 commit comments

Comments
 (0)