Skip to content

Commit dec907c

Browse files
committed
Add support for exception filtering to DLQ handling
1 parent c272207 commit dec907c

File tree

3 files changed

+22
-16
lines changed

3 files changed

+22
-16
lines changed

src/DotPulsar.Extensions.Resiliency/Abstractions/IDeadLetterPolicy.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ namespace DotPulsar.Abstractions;
1616

1717
public interface IDeadLetterPolicy
1818
{
19-
ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customProperties = null, CancellationToken cancellationToken = default);
19+
ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customProperties = null, bool preventRetry = false, CancellationToken cancellationToken = default);
2020
}

src/DotPulsar.Extensions.Resiliency/Extensions/DeadLetterFailureHandler.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@ namespace DotPulsar.Extensions;
1919
public class DeadLetterFailureHandler : IConsumerFailureHandler
2020
{
2121
private readonly IDeadLetterPolicy deadLetterPolicy;
22+
private readonly Func<IMessage, Exception, bool>? retryExceptionHandler;
2223
private readonly Func<IMessage, Exception, TimeSpan?>? delayTimeSelector;
2324
private readonly Func<Exception, IEnumerable<KeyValuePair<string, string?>>> exceptionSerializer;
2425

2526
public DeadLetterFailureHandler(
2627
IDeadLetterPolicy deadLetterPolicy,
28+
Func<IMessage, Exception, bool>? retryExceptionHandler = null,
2729
Func<Exception, IEnumerable<KeyValuePair<string, string?>>>? exceptionSerializer = null,
2830
Func<IMessage, Exception, TimeSpan?>? delayTimeSelector = null) {
2931
this.deadLetterPolicy = deadLetterPolicy ?? throw new ArgumentNullException(nameof(deadLetterPolicy));
32+
this.retryExceptionHandler = retryExceptionHandler;
3033
this.delayTimeSelector = delayTimeSelector;
3134
this.exceptionSerializer = exceptionSerializer ?? SerializeException;
3235

@@ -38,7 +41,8 @@ public DeadLetterFailureHandler(
3841
}
3942

4043
public ValueTask HandleAsync(IMessage message, Exception exception, CancellationToken cancellationToken) {
44+
var preventRetry = retryExceptionHandler == null || retryExceptionHandler(message, exception);
4145
var properties = exceptionSerializer(exception);
42-
return deadLetterPolicy.ReconsumeLater(message, delayTime: delayTimeSelector?.Invoke(message, exception), customProperties: properties, cancellationToken: cancellationToken);
46+
return deadLetterPolicy.ReconsumeLater(message, delayTime: delayTimeSelector?.Invoke(message, exception), customProperties: properties, preventRetry: preventRetry, cancellationToken: cancellationToken);
4347
}
4448
}

src/DotPulsar.Extensions.Resiliency/Extensions/DeadLetterPolicy.cs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,35 +31,35 @@ public class DeadLetterPolicy : IDeadLetterPolicy, IAsyncDisposable
3131
public const string RetryTopicSuffix = "-RETRY";
3232
public const string DeadLetterTopicSuffix = "-DLQ";
3333

34-
private readonly Func<MessageMetadata, ReadOnlySequence<byte>, CancellationToken, ValueTask> deadLetterProducer;
34+
private readonly Func<MessageMetadata, ReadOnlySequence<byte>, CancellationToken, ValueTask>? deadLetterProducer;
3535
private readonly Func<ValueTask>? disposeDeadLetterProducer;
3636
private readonly Func<MessageMetadata, ReadOnlySequence<byte>, CancellationToken, ValueTask>? retryProducer;
3737
private readonly Func<ValueTask>? disposeRetryProducer;
3838

3939
public DeadLetterPolicy(
40-
Func<MessageMetadata, ReadOnlySequence<byte>, CancellationToken, ValueTask> deadLetterProducer,
40+
Func<MessageMetadata, ReadOnlySequence<byte>, CancellationToken, ValueTask>? deadLetterProducer,
4141
Func<MessageMetadata, ReadOnlySequence<byte>, CancellationToken, ValueTask>? retryProducer = null,
4242
int maxRedeliveryCount = DefaultMaxReconsumeTimes,
4343
TimeSpan? retryDelay = null) {
44-
this.deadLetterProducer = deadLetterProducer ?? throw new ArgumentNullException(nameof(deadLetterProducer));
44+
this.deadLetterProducer = deadLetterProducer;
4545
this.retryProducer = retryProducer;
4646
MaxRedeliveryCount = maxRedeliveryCount;
4747
RetryDelay = retryDelay;
4848
}
4949

5050
public DeadLetterPolicy(
51-
IProducerBuilder<ReadOnlySequence<byte>> deadLetterProducerBuilder,
51+
IProducerBuilder<ReadOnlySequence<byte>>? deadLetterProducerBuilder = null,
5252
IProducerBuilder<ReadOnlySequence<byte>>? retryProducerBuilder = null,
5353
int maxRedeliveryCount = DefaultMaxReconsumeTimes,
5454
TimeSpan? retryDelay = null,
5555
ResiliencePipeline? resiliencePipeline = null) {
56-
ArgumentNullException.ThrowIfNull(deadLetterProducerBuilder);
57-
58-
var lazyDeadLetterProducer = new Lazy<IProducer<ReadOnlySequence<byte>>>(() => deadLetterProducerBuilder.CreateResilient(resiliencePipeline));
59-
disposeDeadLetterProducer = () => lazyDeadLetterProducer.IsValueCreated
60-
? lazyDeadLetterProducer.Value.DisposeAsync()
61-
: ValueTask.CompletedTask;
62-
deadLetterProducer = async (metadata, message, ct) => await lazyDeadLetterProducer.Value.Send(metadata, message, ct).ConfigureAwait(false);
56+
if (deadLetterProducerBuilder != null) {
57+
var lazyDeadLetterProducer = new Lazy<IProducer<ReadOnlySequence<byte>>>(() => deadLetterProducerBuilder.CreateResilient(resiliencePipeline));
58+
disposeDeadLetterProducer = () => lazyDeadLetterProducer.IsValueCreated
59+
? lazyDeadLetterProducer.Value.DisposeAsync()
60+
: ValueTask.CompletedTask;
61+
deadLetterProducer = async (metadata, message, ct) => await lazyDeadLetterProducer.Value.Send(metadata, message, ct).ConfigureAwait(false);
62+
}
6363

6464
if (retryProducerBuilder != null) {
6565
var lazyRetryProducer = new Lazy<IProducer<ReadOnlySequence<byte>>>(() => retryProducerBuilder.CreateResilient(resiliencePipeline));
@@ -94,11 +94,11 @@ public async ValueTask DisposeAsync() {
9494
GC.SuppressFinalize(this);
9595
}
9696

97-
public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customProperties = null, CancellationToken cancellationToken = default) {
97+
public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customProperties = null, bool preventRetry = false, CancellationToken cancellationToken = default) {
9898
ArgumentNullException.ThrowIfNull(message);
9999

100100
var metadata = PrepareMetadata(message, delayTime ?? RetryDelay, customProperties);
101-
if (retryProducer != null) {
101+
if (retryProducer != null && !preventRetry) {
102102
var reconsumeTimes = GetReconsumeAndUpdate(metadata);
103103
if (reconsumeTimes <= MaxRedeliveryCount) {
104104
try {
@@ -113,7 +113,9 @@ public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = nu
113113
}
114114
}
115115

116-
await deadLetterProducer(metadata, message.Data, cancellationToken).ConfigureAwait(false);
116+
if (deadLetterProducer != null) {
117+
await deadLetterProducer(metadata, message.Data, cancellationToken).ConfigureAwait(false);
118+
}
117119

118120
static MessageMetadata PrepareMetadata(IMessage message, TimeSpan? delayTime, IEnumerable<KeyValuePair<string, string?>>? customProperties) {
119121
var metadata = new MessageMetadata {

0 commit comments

Comments
 (0)