Skip to content

Commit dcafa9d

Browse files
committed
Added timeout and some tests
1 parent 7c0c3a9 commit dcafa9d

File tree

2 files changed

+168
-14
lines changed

2 files changed

+168
-14
lines changed

src/Foundatio/Utility/ResiliencePipeline.cs

Lines changed: 100 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ public interface IHaveResiliencePipelineProvider
2525

2626
public class FoundatioResiliencePipelineProvider : IResiliencePipelineProvider
2727
{
28-
public static string DefaultPipelineName => "_default_";
29-
3028
private readonly ConcurrentDictionary<string, IResiliencePipeline> _pipelines = new(StringComparer.OrdinalIgnoreCase);
3129
private IResiliencePipeline _defaultPipeline;
3230
private readonly TimeProvider _timeProvider;
@@ -42,13 +40,26 @@ public FoundatioResiliencePipelineProvider(TimeProvider timeProvider = null, ILo
4240
};
4341
}
4442

45-
public IResiliencePipelineProvider WithDefaultPipeline(IResiliencePipeline pipeline)
43+
public FoundatioResiliencePipelineProvider WithDefaultPipeline(IResiliencePipeline pipeline)
4644
{
4745
_defaultPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
4846
return this;
4947
}
5048

51-
public IResiliencePipelineProvider WithPipeline(string name, IResiliencePipeline pipeline)
49+
public FoundatioResiliencePipelineProvider WithDefaultPipeline(Action<FoundatioResiliencePipelineBuilder> builder)
50+
{
51+
if (builder == null)
52+
throw new ArgumentNullException(nameof(builder));
53+
54+
var pipeline = new FoundatioResiliencePipeline(_timeProvider, _loggerFactory.CreateLogger<FoundatioResiliencePipeline>());
55+
var pipelineBuilder = new FoundatioResiliencePipelineBuilder(pipeline);
56+
builder(pipelineBuilder);
57+
58+
_defaultPipeline = pipeline;
59+
return this;
60+
}
61+
62+
public FoundatioResiliencePipelineProvider WithPipeline(string name, IResiliencePipeline pipeline)
5263
{
5364
if (name == null)
5465
throw new ArgumentNullException(nameof(name));
@@ -57,7 +68,7 @@ public IResiliencePipelineProvider WithPipeline(string name, IResiliencePipeline
5768
return this;
5869
}
5970

60-
public IResiliencePipelineProvider WithPipeline(string name, Action<FoundatioResiliencePipelineBuilder> builder)
71+
public FoundatioResiliencePipelineProvider WithPipeline(string name, Action<FoundatioResiliencePipelineBuilder> builder)
6172
{
6273
if (name == null)
6374
throw new ArgumentNullException(nameof(name));
@@ -75,7 +86,10 @@ public IResiliencePipelineProvider WithPipeline(string name, Action<FoundatioRes
7586

7687
public IResiliencePipeline GetPipeline(string name = null)
7788
{
78-
return name == null ? _defaultPipeline : _pipelines.GetOrAdd(name, _ => _defaultPipeline);
89+
if (name == null)
90+
return _defaultPipeline;
91+
92+
return _pipelines.TryGetValue(name, out var pipeline) ? pipeline : _defaultPipeline;
7993
}
8094
}
8195

@@ -97,6 +111,7 @@ public FoundatioResiliencePipeline(TimeProvider timeProvider = null, ILogger log
97111
/// Gets or sets the logger for this pipeline.
98112
/// </summary>
99113
public ILogger Logger {
114+
get => _logger;
100115
set => _logger = value ?? NullLogger.Instance;
101116
}
102117

@@ -125,13 +140,23 @@ public ILogger Logger {
125140
/// </summary>
126141
public bool UseJitter { get; set; } = true;
127142

143+
/// <summary>
144+
/// Gets or sets the timeout for the entire operation.
145+
/// </summary>
146+
public TimeSpan Timeout { get; set; }
147+
128148
public async ValueTask ExecuteAsync(Func<CancellationToken, ValueTask> action, CancellationToken cancellationToken = default)
129149
{
130150
if (action == null)
131151
throw new ArgumentNullException(nameof(action));
132152

153+
cancellationToken.ThrowIfCancellationRequested();
154+
133155
int attempts = 1;
134156
var startTime = _timeProvider.GetUtcNow();
157+
var linkedCancellationToken = cancellationToken;
158+
if (Timeout > TimeSpan.Zero)
159+
linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, new CancellationTokenSource(Timeout).Token).Token;
135160

136161
do
137162
{
@@ -140,7 +165,7 @@ public async ValueTask ExecuteAsync(Func<CancellationToken, ValueTask> action, C
140165

141166
try
142167
{
143-
await action(cancellationToken).AnyContext();
168+
await action(linkedCancellationToken).AnyContext();
144169
return;
145170
}
146171
catch (Exception ex)
@@ -150,11 +175,13 @@ public async ValueTask ExecuteAsync(Func<CancellationToken, ValueTask> action, C
150175

151176
_logger?.LogError(ex, "Retry error: {Message}", ex.Message);
152177

153-
await _timeProvider.SafeDelay(GetInterval(attempts), cancellationToken).AnyContext();
178+
await _timeProvider.SafeDelay(GetInterval(attempts), linkedCancellationToken).AnyContext();
179+
180+
ThrowIfTimedOut(startTime);
154181
}
155182

156183
attempts++;
157-
} while (attempts <= MaxAttempts && !cancellationToken.IsCancellationRequested);
184+
} while (attempts <= MaxAttempts && !linkedCancellationToken.IsCancellationRequested);
158185

159186
throw new TaskCanceledException("Should not get here");
160187
}
@@ -164,8 +191,13 @@ public async ValueTask<T> ExecuteAsync<T>(Func<CancellationToken, ValueTask<T>>
164191
if (action == null)
165192
throw new ArgumentNullException(nameof(action));
166193

194+
cancellationToken.ThrowIfCancellationRequested();
195+
167196
int attempts = 1;
168197
var startTime = _timeProvider.GetUtcNow();
198+
var linkedCancellationToken = cancellationToken;
199+
if (Timeout > TimeSpan.Zero)
200+
linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, new CancellationTokenSource(Timeout).Token).Token;
169201

170202
do
171203
{
@@ -174,7 +206,7 @@ public async ValueTask<T> ExecuteAsync<T>(Func<CancellationToken, ValueTask<T>>
174206

175207
try
176208
{
177-
return await action(cancellationToken).AnyContext();
209+
return await action(linkedCancellationToken).AnyContext();
178210
}
179211
catch (Exception ex)
180212
{
@@ -183,15 +215,27 @@ public async ValueTask<T> ExecuteAsync<T>(Func<CancellationToken, ValueTask<T>>
183215

184216
_logger?.LogError(ex, "Retry error: {Message}", ex.Message);
185217

186-
await _timeProvider.SafeDelay(GetInterval(attempts), cancellationToken).AnyContext();
218+
await _timeProvider.SafeDelay(GetInterval(attempts), linkedCancellationToken).AnyContext();
219+
220+
ThrowIfTimedOut(startTime);
187221
}
188222

189223
attempts++;
190-
} while (attempts <= MaxAttempts && !cancellationToken.IsCancellationRequested);
224+
} while (attempts <= MaxAttempts && !linkedCancellationToken.IsCancellationRequested);
191225

192226
throw new TaskCanceledException("Should not get here");
193227
}
194228

229+
private void ThrowIfTimedOut(DateTimeOffset startTime)
230+
{
231+
if (Timeout <= TimeSpan.Zero)
232+
return;
233+
234+
var elapsed = _timeProvider.GetUtcNow().Subtract(startTime);
235+
if (elapsed >= Timeout)
236+
throw new TimeoutException($"Operation timed out after {Timeout:g}.");
237+
}
238+
195239
private TimeSpan GetInterval(int attempts)
196240
{
197241
var interval = RetryInterval ?? GetBackoffInterval?.Invoke(attempts) ?? TimeSpan.FromMilliseconds(100);
@@ -216,41 +260,83 @@ private TimeSpan GetInterval(int attempts)
216260

217261
public class FoundatioResiliencePipelineBuilder(FoundatioResiliencePipeline pipeline)
218262
{
263+
/// <summary>
264+
/// Sets the logger for the pipeline.
265+
/// </summary>
266+
/// <param name="logger"></param>
267+
/// <returns></returns>
268+
/// <exception cref="ArgumentNullException"></exception>
219269
public FoundatioResiliencePipelineBuilder WithLogger(ILogger logger)
220270
{
221271
pipeline.Logger = logger ?? throw new ArgumentNullException(nameof(logger));
222272
return this;
223273
}
224274

275+
/// <summary>
276+
/// Sets the maximum number of attempts for the pipeline.
277+
/// </summary>
278+
/// <param name="maxAttempts"></param>
279+
/// <returns></returns>
225280
public FoundatioResiliencePipelineBuilder WithMaxAttempts(int maxAttempts)
226281
{
227282
pipeline.MaxAttempts = maxAttempts;
228283
return this;
229284
}
230285

286+
/// <summary>
287+
/// Sets a fixed retry interval for all retries.
288+
/// </summary>
289+
/// <param name="retryInterval"></param>
290+
/// <returns></returns>
231291
public FoundatioResiliencePipelineBuilder WithRetryInterval(TimeSpan? retryInterval)
232292
{
233293
pipeline.RetryInterval = retryInterval;
234294
return this;
235295
}
236296

297+
/// <summary>
298+
/// Sets a function that determines whether to retry based on the attempt number and exception.
299+
/// </summary>
300+
/// <param name="shouldRetry"></param>
301+
/// <returns></returns>
237302
public FoundatioResiliencePipelineBuilder WithShouldRetry(Func<int, Exception, bool> shouldRetry)
238303
{
239304
pipeline.ShouldRetry = shouldRetry;
240305
return this;
241306
}
242307

243-
public FoundatioResiliencePipelineBuilder WithGetBackoffInterval(Func<int, TimeSpan> getBackoffInterval)
308+
/// <summary>
309+
/// Sets a function that returns the backoff interval based on the number of attempts. This overrides the retry interval.
310+
/// </summary>
311+
/// <param name="getBackoffInterval"></param>
312+
/// <returns></returns>
313+
public FoundatioResiliencePipelineBuilder WithBackoffInterval(Func<int, TimeSpan> getBackoffInterval)
244314
{
245315
pipeline.GetBackoffInterval = getBackoffInterval;
246316
return this;
247317
}
248318

249-
public FoundatioResiliencePipelineBuilder WithUseJitter(bool useJitter)
319+
/// <summary>
320+
/// Sets whether to use jitter in the backoff interval.
321+
/// </summary>
322+
/// <param name="useJitter"></param>
323+
/// <returns></returns>
324+
public FoundatioResiliencePipelineBuilder WithJitter(bool useJitter = true)
250325
{
251326
pipeline.UseJitter = useJitter;
252327
return this;
253328
}
329+
330+
/// <summary>
331+
/// Sets the timeout for the entire operation. If set to zero, no timeout is applied.
332+
/// </summary>
333+
/// <param name="timeout"></param>
334+
/// <returns></returns>
335+
public FoundatioResiliencePipelineBuilder WithTimeout(TimeSpan timeout)
336+
{
337+
pipeline.Timeout = timeout;
338+
return this;
339+
}
254340
}
255341

256342
public static class FoundatioResiliencePipelineExtensions

tests/Foundatio.Tests/Utility/ResiliencePipelineTests.cs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,74 @@ public async Task CanBoomWithRetriesAndResult()
129129
Assert.Equal(1, result);
130130
}
131131

132+
[Fact]
133+
public async Task CanRunWithRetriesAndCancellation()
134+
{
135+
var cts = new CancellationTokenSource();
136+
await cts.CancelAsync();
137+
138+
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
139+
{
140+
await _pipeline.ExecuteAsync(DoStuff, cts.Token);
141+
});
142+
143+
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
144+
{
145+
await _pipeline.ExecuteAsync(async () => await DoStuff(), cts.Token);
146+
});
147+
}
148+
149+
[Fact]
150+
public Task CanRunWithTimeout()
151+
{
152+
var pipeline = new FoundatioResiliencePipeline
153+
{
154+
Logger = _logger,
155+
MaxAttempts = 5,
156+
Timeout = TimeSpan.FromMilliseconds(100)
157+
};
158+
159+
return Assert.ThrowsAsync<TimeoutException>(async () =>
160+
{
161+
await pipeline.ExecuteAsync(async ct =>
162+
{
163+
await Task.Delay(500, ct);
164+
});
165+
});
166+
}
167+
168+
[Fact]
169+
public void CanUseProvider()
170+
{
171+
var provider = new FoundatioResiliencePipelineProvider()
172+
.WithPipeline("TestPipeline", b => b.WithLogger(_logger).WithMaxAttempts(10).WithRetryInterval(TimeSpan.FromMilliseconds(20)))
173+
.WithDefaultPipeline(b => b.WithLogger(_logger).WithMaxAttempts(7).WithRetryInterval(TimeSpan.FromMilliseconds(100)).WithJitter());
174+
175+
// named pipeline
176+
var pipeline = provider.GetPipeline("TestPipeline");
177+
Assert.NotNull(pipeline);
178+
var foundationPipeline = Assert.IsType<FoundatioResiliencePipeline>(pipeline);
179+
Assert.Equal(_logger, foundationPipeline.Logger);
180+
Assert.Equal(10, foundationPipeline.MaxAttempts);
181+
Assert.Equal(TimeSpan.FromMilliseconds(20), foundationPipeline.RetryInterval);
182+
183+
// default pipeline
184+
pipeline = provider.GetPipeline();
185+
Assert.NotNull(pipeline);
186+
foundationPipeline = Assert.IsType<FoundatioResiliencePipeline>(pipeline);
187+
Assert.Equal(_logger, foundationPipeline.Logger);
188+
Assert.Equal(7, foundationPipeline.MaxAttempts);
189+
Assert.Equal(TimeSpan.FromMilliseconds(100), foundationPipeline.RetryInterval);
190+
191+
// unknown pipeline uses default
192+
pipeline = provider.GetPipeline("UnknownPipeline");
193+
Assert.NotNull(pipeline);
194+
foundationPipeline = Assert.IsType<FoundatioResiliencePipeline>(pipeline);
195+
Assert.Equal(_logger, foundationPipeline.Logger);
196+
Assert.Equal(7, foundationPipeline.MaxAttempts);
197+
Assert.Equal(TimeSpan.FromMilliseconds(100), foundationPipeline.RetryInterval);
198+
}
199+
132200
[Fact]
133201
public async Task CanUsePolly()
134202
{

0 commit comments

Comments
 (0)