Skip to content

Commit b3daa98

Browse files
authored
Fix RequestCollapserEngine caching lazy with exceptions as value indefinitely (#6)
* Lazy objects caches exception that are created by the factory. This will result in the RequestCollapser's Engine to cache the exception indefinitely. * Address PR
1 parent 8e23bf5 commit b3daa98

File tree

6 files changed

+89
-41
lines changed

6 files changed

+89
-41
lines changed

src/Polly.Contrib.DuplicateRequestCollapser.Specs/RequestCollapserAsyncTResultSpecs.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Threading.Tasks;
1+
using System.Collections.Concurrent;
2+
using System.Threading.Tasks;
23
using Xunit.Abstractions;
34

45
namespace Polly.Contrib.DuplicateRequestCollapser.Specs
@@ -7,11 +8,16 @@ public class RequestCollapserAsyncTResultSpecs : RequestCollapserTResultSpecsBas
78
{
89
public RequestCollapserAsyncTResultSpecs(ITestOutputHelper testOutputHelper) : base(testOutputHelper) { }
910

11+
private ConcurrentDictionary<(bool, IKeyStrategy, ISyncLockProvider), IsPolicy> PolicyCache = new ConcurrentDictionary<(bool, IKeyStrategy, ISyncLockProvider), IsPolicy>();
12+
1013
protected override IsPolicy GetPolicy(bool useCollapser, IKeyStrategy overrideKeyStrategy = null, ISyncLockProvider lockProvider = null)
1114
{
12-
return useCollapser ?
15+
return PolicyCache.GetOrAdd((useCollapser, overrideKeyStrategy, lockProvider), _ =>
16+
{
17+
return useCollapser ?
1318
AsyncRequestCollapserPolicy<ResultClass>.Create(overrideKeyStrategy ?? RequestCollapserPolicy.DefaultKeyStrategy, new AsyncWrapperLockProvider(lockProvider ?? RequestCollapserPolicy.GetDefaultLockProvider()))
1419
: (IAsyncPolicy<ResultClass>)Policy.NoOpAsync<ResultClass>();
20+
});
1521
}
1622

1723
protected override Task ExecuteThroughPolicy(IsPolicy policy, Context context, int j, bool gated)

src/Polly.Contrib.DuplicateRequestCollapser.Specs/RequestCollapserSpecsBase.TestOrchestration.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ protected void UnderlyingExpensiveWork(int i)
5050
ReleaseHoldingGate();
5151

5252
// Wait for task completion.
53-
Task.WaitAll(ConcurrentTasks);
53+
try
54+
{
55+
Task.WaitAll(ConcurrentTasks);
56+
}
57+
catch
58+
{
59+
}
5460
testOutputHelper.WriteLine("All tasks completed.");
5561

5662
// Return results to caller; the caller is responsible for asserting.

src/Polly.Contrib.DuplicateRequestCollapser.Specs/RequestCollapserTResultSpecs.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Threading.Tasks;
1+
using System.Collections.Concurrent;
2+
using System.Threading.Tasks;
23
using Xunit.Abstractions;
34

45
namespace Polly.Contrib.DuplicateRequestCollapser.Specs
@@ -7,11 +8,16 @@ public class RequestCollapserTResultSpecs : RequestCollapserTResultSpecsBase
78
{
89
public RequestCollapserTResultSpecs(ITestOutputHelper testOutputHelper) : base(testOutputHelper) { }
910

11+
private ConcurrentDictionary<(bool, IKeyStrategy, ISyncLockProvider), IsPolicy> PolicyCache = new ConcurrentDictionary<(bool, IKeyStrategy, ISyncLockProvider), IsPolicy>();
12+
1013
protected override IsPolicy GetPolicy(bool useCollapser, IKeyStrategy overrideKeyStrategy = null, ISyncLockProvider lockProvider = null)
1114
{
12-
return useCollapser ?
13-
RequestCollapserPolicy<ResultClass>.Create(overrideKeyStrategy ?? RequestCollapserPolicy.DefaultKeyStrategy, lockProvider ?? RequestCollapserPolicy.GetDefaultLockProvider())
14-
: (ISyncPolicy<ResultClass>)Policy.NoOp<ResultClass>();
15+
return PolicyCache.GetOrAdd((useCollapser, overrideKeyStrategy, lockProvider), _ =>
16+
{
17+
return useCollapser ?
18+
RequestCollapserPolicy<ResultClass>.Create(overrideKeyStrategy ?? RequestCollapserPolicy.DefaultKeyStrategy, lockProvider ?? RequestCollapserPolicy.GetDefaultLockProvider())
19+
: (ISyncPolicy<ResultClass>)Policy.NoOp<ResultClass>();
20+
});
1521
}
1622

1723
protected override Task ExecuteThroughPolicy(IsPolicy policy, Context context, int j, bool gated)

src/Polly.Contrib.DuplicateRequestCollapser.Specs/RequestCollapserTResultSpecsBase.cs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,24 @@ namespace Polly.Contrib.DuplicateRequestCollapser.Specs
99
{
1010
public abstract class RequestCollapserTResultSpecsBase : RequestCollapserSpecsBase
1111
{
12+
private Random _rng = new Random();
13+
1214
protected RequestCollapserTResultSpecsBase(ITestOutputHelper testOutputHelper) : base(testOutputHelper) { }
1315

14-
private protected Func<ResultClass> ResultFactory => () => new ResultClass(ResultPrimitive.Good);
16+
private protected Func<ResultClass> ResultFactory = () => new ResultClass(ResultPrimitive.Good);
1517

1618
[Theory]
1719
[ClassData(typeof(RequestCollapserTestParallelisms))]
1820
public void Executing_concurrent_duplicate_task_through_CollapserPolicy_should_execute_only_once_and_return_same_single_result_instance(int parallelism)
1921
{
2022
(int actualInvocations, Task[] tasks) = Execute_parallel_delegates_through_policy_with_key_strategy(parallelism, useCollapser: true, sameKey: true);
21-
23+
2224
actualInvocations.Should().Be(1);
2325

2426
// All executions should have been handled the same single result instance.
25-
ResultClass first = ((Task<ResultClass>) tasks[0]).Result;
27+
ResultClass first = ((Task<ResultClass>)tasks[0]).Result;
2628

27-
tasks.All(t => Object.ReferenceEquals(((Task<ResultClass>) t).Result, first)).Should().BeTrue();
29+
tasks.All(t => Object.ReferenceEquals(((Task<ResultClass>)t).Result, first)).Should().BeTrue();
2830
}
2931

3032
[Theory]
@@ -39,8 +41,30 @@ public void Executing_concurrent_duplicate_task_not_through_CollapserPolicy_shou
3941
actualInvocations.Should().Be(parallelism);
4042

4143
// All executions should have been handled the same single result instance.
42-
tasks.Select(t => ((Task<ResultClass>) t).Result).Distinct().Count().Should().Be(parallelism);
44+
tasks.Select(t => ((Task<ResultClass>)t).Result).Distinct().Count().Should().Be(parallelism);
4345
}
4446

47+
[Theory]
48+
[ClassData(typeof(RequestCollapserTestParallelisms))]
49+
public void Executing_concurrent_duplicate_faulted_task_through_CollapserPolicy_should_execute_only_once_and_return_same_single_result_instance(int parallelism)
50+
{
51+
ResultFactory = () => throw new Exception(_rng.Next().ToString());
52+
(int actualInvocations, Task[] tasks) = Execute_parallel_delegates_through_policy_with_key_strategy(parallelism, useCollapser: true, sameKey: true);
53+
actualInvocations.Should().Be(1);
54+
tasks.First().IsFaulted.Should().BeTrue();
55+
56+
Exception first = tasks.First().Exception.InnerException;
57+
// All executions should have been handed the same single result instance.
58+
tasks.Select(x => x.Exception.InnerException).ShouldAllBeEquivalentTo(first);
59+
60+
(actualInvocations, tasks) = Execute_parallel_delegates_through_policy_with_key_strategy(parallelism, useCollapser: true, sameKey: true);
61+
actualInvocations.Should().Be(2);
62+
tasks.First().IsFaulted.Should().BeTrue();
63+
64+
Exception second = tasks.First().Exception.InnerException;
65+
// The result of the second batch should not be the same as the first batch.
66+
second.Message.Should().NotBe(first.Message);
67+
tasks.Select(x => x.Exception.InnerException).ShouldAllBeEquivalentTo(second);
68+
}
4569
}
4670
}

src/Polly.Contrib.DuplicateRequestCollapser/RequestCollapserEngine.cs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,33 @@ internal static TResult Implementation<TResult>(
3131
lazy = collapser.GetOrAdd(key, new Lazy<object>(() => action(context, cancellationToken), LazyThreadSafetyMode.ExecutionAndPublication)); // Note: per documentation, LazyThreadSafetyMode.ExecutionAndPublication guarantees single execution, but means the executed code must not lock, as this risks deadlocks. We should document.
3232
}
3333

34-
TResult result = (TResult)lazy.Value;
35-
36-
// As soon as the lazy has returned a result to one thread, the concurrent request set is over, so we evict the lazy from the ConcurrentDictionary.
37-
// We need to evict within a lock, to be sure we are not, due to potential race with new threads populating, evicting a different lazy created by a different thread.
38-
// To reduce lock contention, first check outside the lock whether we still need to remove it (we will double-check inside the lock).
39-
if (collapser.TryGetValue(key, out Lazy<object> currentValue))
34+
try
35+
{
36+
return (TResult)lazy.Value;
37+
}
38+
finally
4039
{
41-
if (currentValue == lazy)
40+
// As soon as the lazy has returned a result to one thread, the concurrent request set is over, so we evict the lazy from the ConcurrentDictionary.
41+
// We need to evict within a lock, to be sure we are not, due to potential race with new threads populating, evicting a different lazy created by a different thread.
42+
// To reduce lock contention, first check outside the lock whether we still need to remove it (we will double-check inside the lock).
43+
if (collapser.TryGetValue(key, out Lazy<object> currentValue))
4244
{
43-
using (lockProvider.AcquireLock(key, context, cancellationToken))
45+
if (currentValue == lazy)
4446
{
45-
// Double-check that there has not been a race which updated the dictionary with a new value.
46-
if (collapser.TryGetValue(key, out Lazy<object> valueWithinLock))
47+
using (lockProvider.AcquireLock(key, context, cancellationToken))
4748
{
48-
if (valueWithinLock == lazy)
49+
// Double-check that there has not been a race which updated the dictionary with a new value.
50+
if (collapser.TryGetValue(key, out Lazy<object> valueWithinLock))
4951
{
50-
collapser.TryRemove(key, out _);
52+
if (valueWithinLock == lazy)
53+
{
54+
collapser.TryRemove(key, out _);
55+
}
5156
}
5257
}
5358
}
5459
}
5560
}
56-
57-
return result;
5861
}
5962
}
6063
}

src/Polly.Contrib.DuplicateRequestCollapser/RequestCollapserEngineAsync.cs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,34 @@ internal static async Task<TResult> ImplementationAsync<TResult>(
3333
lazy = collapser.GetOrAdd(key, new Lazy<Task<object>>(async () => await action(context, cancellationToken).ConfigureAwait(continueOnCapturedContext), LazyThreadSafetyMode.ExecutionAndPublication)); // Note: per documentation, LazyThreadSafetyMode.ExecutionAndPublication guarantees single execution, but means the executed code must not lock, as this risks deadlocks. We should document.
3434
}
3535

36-
TResult result = (TResult)await lazy.Value.ConfigureAwait(continueOnCapturedContext);
37-
38-
// As soon as the lazy has returned a result to one thread, the concurrent request set is over, so we evict the lazy from the ConcurrentDictionary.
39-
// We need to evict within a lock, to be sure we are not, due to potential race with new threads populating, evicting a different lazy created by a different thread.
40-
// To reduce lock contention, first check outside the lock whether we still need to remove it (we will double-check inside the lock).
41-
if (collapser.TryGetValue(key, out Lazy<Task<object>> currentValue))
36+
try
37+
{
38+
return (TResult)await lazy.Value.ConfigureAwait(continueOnCapturedContext);
39+
}
40+
finally
4241
{
43-
if (currentValue == lazy)
42+
// As soon as the lazy has returned a result to one thread, the concurrent request set is over, so we evict the lazy from the ConcurrentDictionary.
43+
// We need to evict within a lock, to be sure we are not, due to potential race with new threads populating, evicting a different lazy created by a different thread.
44+
// To reduce lock contention, first check outside the lock whether we still need to remove it (we will double-check inside the lock).
45+
if (collapser.TryGetValue(key, out Lazy<Task<object>> currentValue))
4446
{
45-
await using (lockProvider.AcquireLockAsync(key, context, cancellationToken, continueOnCapturedContext)
46-
.ConfigureAwait(continueOnCapturedContext))
47+
if (currentValue == lazy)
4748
{
48-
// Double-check that there has not been a race which updated the dictionary with a new value.
49-
if (collapser.TryGetValue(key, out Lazy<Task<object>> valueWithinLock))
49+
await using (lockProvider.AcquireLockAsync(key, context, cancellationToken, continueOnCapturedContext)
50+
.ConfigureAwait(continueOnCapturedContext))
5051
{
51-
if (valueWithinLock == lazy)
52+
// Double-check that there has not been a race which updated the dictionary with a new value.
53+
if (collapser.TryGetValue(key, out Lazy<Task<object>> valueWithinLock))
5254
{
53-
collapser.TryRemove(key, out _);
55+
if (valueWithinLock == lazy)
56+
{
57+
collapser.TryRemove(key, out _);
58+
}
5459
}
5560
}
5661
}
5762
}
5863
}
59-
60-
return result;
6164
}
6265
}
6366
}

0 commit comments

Comments
 (0)