Skip to content

Commit 15d9a16

Browse files
committed
Enhanced default resilient producer options
1 parent a1fd2fb commit 15d9a16

File tree

3 files changed

+22
-25
lines changed

3 files changed

+22
-25
lines changed

src/DotPulsar.Extensions.Resiliency/Extensions/PulsarResilientExtensions.cs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,44 +38,40 @@ public static ValueTask Process<TMessage>(
3838
CancellationToken cancellationToken = default)
3939
=> Process(consumer, processor, resiliencePipeline, new ProcessingOptions(), failureHandler, cancellationToken);
4040

41-
public static ResiliencePipelineBuilder AddResilientProducer(this ResiliencePipelineBuilder pipelineBuilder, Action<RetryStrategyOptions>? configureRetry = null, Action<TimeoutStrategyOptions>? configureTimeout = null) {
42-
return pipelineBuilder.AddProducerRetry(configureRetry).AddProducerTimeout(configureTimeout);
43-
}
44-
45-
public static ResiliencePipelineBuilder AddProducerTimeout(this ResiliencePipelineBuilder pipelineBuilder, Action<TimeoutStrategyOptions>? configure = null) {
46-
var options = new TimeoutStrategyOptions {
41+
public static ResiliencePipelineBuilder AddResilientProducerDefaults(this ResiliencePipelineBuilder pipelineBuilder, Action<RetryStrategyOptions>? configureRetry = null, Action<TimeoutStrategyOptions>? configureTimeout = null) {
42+
var timeoutOptions = new TimeoutStrategyOptions {
4743
Timeout = TimeSpan.FromSeconds(30)
4844
};
49-
configure?.Invoke(options);
50-
return pipelineBuilder.AddTimeout(options);
51-
}
45+
configureTimeout?.Invoke(timeoutOptions);
46+
pipelineBuilder.AddTimeout(timeoutOptions);
5247

53-
public static ResiliencePipelineBuilder AddProducerRetry(this ResiliencePipelineBuilder pipelineBuilder, Action<RetryStrategyOptions>? configure = null) {
54-
var options = new RetryStrategyOptions {
48+
var retryOptions = new RetryStrategyOptions {
5549
MaxRetryAttempts = 10,
5650
Delay = TimeSpan.FromMilliseconds(100),
5751
MaxDelay = TimeSpan.FromMilliseconds(5000),
5852
BackoffType = DelayBackoffType.Exponential,
5953
ShouldHandle = static args => {
6054
var ex = args.Outcome.Exception;
61-
if (ShouldHandleProducerException(ex)) {
55+
if (ShouldRetryProducerException(ex)) {
6256
return new ValueTask<bool>(true);
6357
}
6458

6559
return new ValueTask<bool>(false);
6660
}
6761
};
68-
configure?.Invoke(options);
69-
return pipelineBuilder.AddRetry(options);
62+
configureRetry?.Invoke(retryOptions);
63+
pipelineBuilder.AddRetry(retryOptions);
64+
65+
return pipelineBuilder;
7066
}
7167

72-
public static bool ShouldHandleProducerException(Exception? exception) {
68+
public static bool ShouldRetryProducerException(Exception? exception) {
7369
return exception is not ResilientProducerDisposedException
7470
&& exception is not PulsarClientDisposedException
7571
&& exception is ProducerFaultedException or ProducerClosedException or ObjectDisposedException;
7672
}
7773

78-
public static IProducer<TMessage> CreateResilient<TMessage>(this IProducerBuilder<TMessage> producerBuilder, ResiliencePipeline? resiliencePipeline = null) {
74+
public static IProducer<TMessage> CreateResilient<TMessage>(this IProducerBuilder<TMessage> producerBuilder, ResiliencePipeline? resiliencePipeline) {
7975
ArgumentNullException.ThrowIfNull(producerBuilder);
8076

8177
if (resiliencePipeline == null || Equals(resiliencePipeline, ResiliencePipeline.Empty)) {
@@ -89,7 +85,7 @@ public static IProducer<TMessage> CreateResilient<TMessage>(this IProducerBuilde
8985
if (configurePipelineBuilder != null) {
9086
configurePipelineBuilder(pipelineBuilder);
9187
} else {
92-
pipelineBuilder.AddResilientProducer();
88+
pipelineBuilder.AddResilientProducerDefaults();
9389
}
9490

9591
return CreateResilient(producerBuilder, pipelineBuilder.Build());

src/DotPulsar.Extensions.Resiliency/Internal/ResilientProducer.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,12 @@ public ResilienceSendChannel(ResilientProducer<TMessage> producer) {
140140
this.producer = producer;
141141
}
142142

143-
public async ValueTask Send(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent, CancellationToken cancellationToken) {
144-
var messageId = await producer.Send(metadata, message, cancellationToken).ConfigureAwait(false);
145-
if (onMessageSent != null) {
146-
await onMessageSent(messageId).ConfigureAwait(false);
147-
}
143+
public ValueTask Send(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent, CancellationToken cancellationToken) {
144+
return producer.resiliencePipeline.ExecuteAsync(static (state, ct) => {
145+
var (sendChannel, message, metadata, onMessageSent) = state;
146+
var producer = sendChannel.producer.GetOrCreateProducer();
147+
return producer.SendChannel.Send(metadata, message, onMessageSent, ct);
148+
}, (this, message, metadata, onMessageSent), cancellationToken);
148149
}
149150

150151
public void Complete() {

src/DotPulsar.Extensions.Resiliency/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ await using var dlq = new DeadLetterPolicy(
2121
...
2222
await dlq.ReconsumeLater(message);
2323
```
24-
You will need to have a separate consumer set up to listen to the retry topic in order to process the retry messages. If those messages fail, then you will need to re-submit them to the DLQ to mark them as retry or dead.
24+
You will need to have a separate consumer set up to listen to the retry topic in order to process the retry messages. If those messages fail, then you will need to re-submit them to the `DeadLetterPolicy` to mark them as retry or dead.
2525

2626
**Using [Polly](https://www.pollydocs.org/) to create resilient producers:**
2727
```c#
2828
await using var producer = client.NewProducer(Schema.String)
2929
.Topic("...")
30-
.CreateResilient(pipeline => {
31-
pipeline.AddRetryProducer(options => {
30+
.CreateResilient(static pipeline => {
31+
pipeline.AddResilientProducerDefaults(configureRetry: static options => {
3232
options.MaxRetryAttempts = 3;
3333
});
3434
});

0 commit comments

Comments
 (0)