Skip to content

Commit 0dfab84

Browse files
Avoid cancellation token cross-contamination and thundering herd in subscription cache (#1587)
* Primary ctor * Fix problematic caching in CachedSubscriptionStore * Fix race around semaphore * Clear * Use monotonic stopwatch * Use the lease structure consistently * Get rid of the sugar * Get rid of the sugar * EnsureFresh semaphore acquiring can be cancelled * Adding unit test that reproduced the bug on master --------- Co-authored-by: Daniel Marbach <[email protected]>
1 parent 1de1c6f commit 0dfab84

File tree

4 files changed

+168
-46
lines changed

4 files changed

+168
-46
lines changed

src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,19 @@ public PostgreSqlTransportInfrastructure(PostgreSqlTransport transport, HostSett
4747

4848
public override async Task Shutdown(CancellationToken cancellationToken = default)
4949
{
50-
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
51-
.ConfigureAwait(false);
50+
try
51+
{
52+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
53+
.ConfigureAwait(false);
5254

53-
if (dueDelayedMessageProcessor != null)
55+
if (dueDelayedMessageProcessor != null)
56+
{
57+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
58+
}
59+
}
60+
finally
5461
{
55-
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
62+
(subscriptionStore as IDisposable)?.Dispose();
5663
}
5764
}
5865

src/NServiceBus.Transport.Sql.Shared/PubSub/CachedSubscriptionStore.cs

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,67 +3,127 @@ namespace NServiceBus.Transport.Sql.Shared
33
using System;
44
using System.Collections.Concurrent;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Threading;
78
using System.Threading.Tasks;
89

910

10-
class CachedSubscriptionStore : ISubscriptionStore
11+
sealed class CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor) : ISubscriptionStore, IDisposable
1112
{
12-
public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor)
13+
public async Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
1314
{
14-
this.inner = inner;
15-
this.cacheFor = cacheFor;
16-
}
17-
18-
public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
19-
{
20-
var cacheItem = Cache.GetOrAdd(CacheKey(eventType),
21-
_ => new CacheItem
22-
{
23-
StoredUtc = DateTime.UtcNow,
24-
Subscribers = inner.GetSubscribers(eventType, cancellationToken)
25-
});
15+
var cacheKey = CacheKey(eventType);
16+
var cachedSubscriptions = Cache.GetOrAdd(cacheKey,
17+
static (_, state) => new CachedSubscriptions(state.inner, state.eventType, state.cacheFor),
18+
(inner, eventType, cacheFor));
2619

27-
var age = DateTime.UtcNow - cacheItem.StoredUtc;
28-
if (age >= cacheFor)
29-
{
30-
cacheItem.Subscribers = inner.GetSubscribers(eventType, cancellationToken);
31-
cacheItem.StoredUtc = DateTime.UtcNow;
32-
}
33-
34-
return cacheItem.Subscribers;
20+
return await cachedSubscriptions.EnsureFresh(cancellationToken).ConfigureAwait(false);
3521
}
3622

3723
public async Task Subscribe(string endpointName, string endpointAddress, Type eventType, CancellationToken cancellationToken = default)
3824
{
39-
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
40-
ClearForMessageType(CacheKey(eventType));
25+
try
26+
{
27+
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
28+
}
29+
finally
30+
{
31+
await Clear(CacheKey(eventType))
32+
.ConfigureAwait(false);
33+
}
4134
}
4235

4336
public async Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default)
4437
{
45-
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
46-
ClearForMessageType(CacheKey(eventType));
38+
try
39+
{
40+
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
41+
}
42+
finally
43+
{
44+
await Clear(CacheKey(eventType))
45+
.ConfigureAwait(false);
46+
}
4747
}
4848

49-
void ClearForMessageType(string topic)
49+
public void Dispose()
5050
{
51-
Cache.TryRemove(topic, out _);
52-
}
51+
if (Cache.IsEmpty)
52+
{
53+
return;
54+
}
5355

54-
static string CacheKey(Type eventType)
55-
{
56-
return eventType.FullName;
56+
foreach (var subscription in Cache.Values)
57+
{
58+
subscription.Dispose();
59+
}
60+
61+
Cache.Clear();
5762
}
5863

59-
TimeSpan cacheFor;
60-
ISubscriptionStore inner;
61-
ConcurrentDictionary<string, CacheItem> Cache = new ConcurrentDictionary<string, CacheItem>();
64+
#pragma warning disable PS0018 // Clear should not be cancellable
65+
ValueTask Clear(string cacheKey) => Cache.TryGetValue(cacheKey, out var cachedSubscriptions) ? cachedSubscriptions.Clear() : ValueTask.CompletedTask;
66+
#pragma warning restore PS0018
6267

63-
class CacheItem
68+
static string CacheKey(Type eventType) => eventType.FullName;
69+
70+
readonly ConcurrentDictionary<string, CachedSubscriptions> Cache = new();
71+
72+
sealed class CachedSubscriptions(ISubscriptionStore store, Type eventType, TimeSpan cacheFor) : IDisposable
6473
{
65-
public DateTime StoredUtc { get; set; } // Internal usage, only set/get using private
66-
public Task<List<string>> Subscribers { get; set; }
74+
readonly SemaphoreSlim fetchSemaphore = new(1, 1);
75+
76+
List<string> cachedSubscriptions;
77+
long cachedAtTimestamp;
78+
79+
public async ValueTask<List<string>> EnsureFresh(CancellationToken cancellationToken = default)
80+
{
81+
var cachedSubscriptionsSnapshot = cachedSubscriptions;
82+
var cachedAtTimestampSnapshot = cachedAtTimestamp;
83+
84+
if (cachedSubscriptionsSnapshot != null && Stopwatch.GetElapsedTime(cachedAtTimestampSnapshot) < cacheFor)
85+
{
86+
return cachedSubscriptionsSnapshot;
87+
}
88+
89+
await fetchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
90+
91+
try
92+
{
93+
if (cachedSubscriptions != null && Stopwatch.GetElapsedTime(cachedAtTimestamp) < cacheFor)
94+
{
95+
return cachedSubscriptions;
96+
}
97+
98+
cachedSubscriptions = await store.GetSubscribers(eventType, cancellationToken).ConfigureAwait(false);
99+
cachedAtTimestamp = Stopwatch.GetTimestamp();
100+
101+
return cachedSubscriptions;
102+
}
103+
finally
104+
{
105+
fetchSemaphore.Release();
106+
}
107+
}
108+
109+
#pragma warning disable PS0018 // Clear should not be cancellable
110+
public async ValueTask Clear()
111+
#pragma warning restore PS0018
112+
{
113+
try
114+
{
115+
await fetchSemaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false);
116+
117+
cachedSubscriptions = null;
118+
cachedAtTimestamp = 0;
119+
}
120+
finally
121+
{
122+
fetchSemaphore.Release();
123+
}
124+
}
125+
126+
public void Dispose() => fetchSemaphore.Dispose();
67127
}
68128
}
69129
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
namespace NServiceBus.Transport.SqlServer.UnitTests;
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NUnit.Framework;
8+
using Sql.Shared;
9+
10+
[TestFixture]
11+
public class CachedSubscriptionStoreTests
12+
{
13+
// Reproduces a bug that previously existed in the original implementation that stored tasks and their outcomes in the cache (see #1588)
14+
// leaving this test around to make sure such a problem doesn't occur again.
15+
[Test]
16+
public void Should_not_cache_cancelled_operations()
17+
{
18+
var subscriptionStore = new FakeSubscriptionStore
19+
{
20+
GetSubscribersAction = (_, token) => token.IsCancellationRequested ? Task.FromCanceled<List<string>>(token) : Task.FromResult(new List<string>())
21+
};
22+
23+
var cache = new CachedSubscriptionStore(subscriptionStore, TimeSpan.FromSeconds(60));
24+
25+
Assert.Multiple(() =>
26+
{
27+
_ = Assert.ThrowsAsync<TaskCanceledException>(async () =>
28+
await cache.GetSubscribers(typeof(object), new CancellationToken(true)));
29+
Assert.DoesNotThrowAsync(async () => await cache.GetSubscribers(typeof(object), CancellationToken.None));
30+
});
31+
}
32+
33+
class FakeSubscriptionStore : ISubscriptionStore
34+
{
35+
public Func<Type, CancellationToken, Task<List<string>>> GetSubscribersAction { get; set; } =
36+
(_, _) => Task.FromResult(new List<string>());
37+
38+
public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default) =>
39+
GetSubscribersAction(eventType, cancellationToken);
40+
41+
public Task Subscribe(string endpointName, string endpointAddress, Type eventType,
42+
CancellationToken cancellationToken = default) =>
43+
throw new NotImplementedException();
44+
45+
public Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default) =>
46+
throw new NotImplementedException();
47+
}
48+
}

src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,12 +337,19 @@ public void ConfigureSendInfrastructure()
337337

338338
public override async Task Shutdown(CancellationToken cancellationToken = default)
339339
{
340-
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
341-
.ConfigureAwait(false);
340+
try
341+
{
342+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
343+
.ConfigureAwait(false);
342344

343-
if (dueDelayedMessageProcessor != null)
345+
if (dueDelayedMessageProcessor != null)
346+
{
347+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
348+
}
349+
}
350+
finally
344351
{
345-
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
352+
(subscriptionStore as IDisposable)?.Dispose();
346353
}
347354
}
348355

0 commit comments

Comments
 (0)