Skip to content

Commit 19b7a24

Browse files
Avoid cancellation token cross-contamination and thundering herd in subscription cache (#1590)
* Avoid cancellation token cross-contamination and thundering herd in subscription cache (#1587) * Primary ctor * Fix problematic caching in CachedSubscriptionStore * Fix race around semaphore * Clear * Use monotonic stopwatch * Use the lease structure consistently * Get rid of the sugar * Get rid of the sugar * EnsureFresh semaphore acquiring can be cancelled * Adding unit test that reproduced the bug on master --------- Co-authored-by: Daniel Marbach <[email protected]> * Fix Stop --------- Co-authored-by: Daniel Marbach <[email protected]>
1 parent 26d358a commit 19b7a24

File tree

5 files changed

+183
-42
lines changed

5 files changed

+183
-42
lines changed

src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,22 @@ public PostgreSqlTransportInfrastructure(PostgreSqlTransport transport, HostSett
4545
exceptionClassifier = new PostgreSqlExceptionClassifier();
4646
}
4747

48-
public override Task Shutdown(CancellationToken cancellationToken = default)
48+
public override async Task Shutdown(CancellationToken cancellationToken = default)
4949
{
50-
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
50+
try
51+
{
52+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
53+
.ConfigureAwait(false);
54+
55+
if (dueDelayedMessageProcessor != null)
56+
{
57+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
58+
}
59+
}
60+
finally
61+
{
62+
(subscriptionStore as IDisposable)?.Dispose();
63+
}
5164
}
5265

5366
public override string ToTransportAddress(Transport.QueueAddress address)

src/NServiceBus.Transport.Sql.Shared/PubSub/CachedSubscriptionStore.cs

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,67 +3,127 @@ namespace NServiceBus.Transport.Sql.Shared
33
using System;
44
using System.Collections.Concurrent;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Threading;
78
using System.Threading.Tasks;
89

910

10-
class CachedSubscriptionStore : ISubscriptionStore
11+
sealed class CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor) : ISubscriptionStore, IDisposable
1112
{
12-
public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor)
13+
public async Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
1314
{
14-
this.inner = inner;
15-
this.cacheFor = cacheFor;
16-
}
17-
18-
public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
19-
{
20-
var cacheItem = Cache.GetOrAdd(CacheKey(eventType),
21-
_ => new CacheItem
22-
{
23-
StoredUtc = DateTime.UtcNow,
24-
Subscribers = inner.GetSubscribers(eventType, cancellationToken)
25-
});
15+
var cacheKey = CacheKey(eventType);
16+
var cachedSubscriptions = Cache.GetOrAdd(cacheKey,
17+
static (_, state) => new CachedSubscriptions(state.inner, state.eventType, state.cacheFor),
18+
(inner, eventType, cacheFor));
2619

27-
var age = DateTime.UtcNow - cacheItem.StoredUtc;
28-
if (age >= cacheFor)
29-
{
30-
cacheItem.Subscribers = inner.GetSubscribers(eventType, cancellationToken);
31-
cacheItem.StoredUtc = DateTime.UtcNow;
32-
}
33-
34-
return cacheItem.Subscribers;
20+
return await cachedSubscriptions.EnsureFresh(cancellationToken).ConfigureAwait(false);
3521
}
3622

3723
public async Task Subscribe(string endpointName, string endpointAddress, Type eventType, CancellationToken cancellationToken = default)
3824
{
39-
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
40-
ClearForMessageType(CacheKey(eventType));
25+
try
26+
{
27+
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
28+
}
29+
finally
30+
{
31+
await Clear(CacheKey(eventType))
32+
.ConfigureAwait(false);
33+
}
4134
}
4235

4336
public async Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default)
4437
{
45-
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
46-
ClearForMessageType(CacheKey(eventType));
38+
try
39+
{
40+
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
41+
}
42+
finally
43+
{
44+
await Clear(CacheKey(eventType))
45+
.ConfigureAwait(false);
46+
}
4747
}
4848

49-
void ClearForMessageType(string topic)
49+
public void Dispose()
5050
{
51-
Cache.TryRemove(topic, out _);
52-
}
51+
if (Cache.IsEmpty)
52+
{
53+
return;
54+
}
5355

54-
static string CacheKey(Type eventType)
55-
{
56-
return eventType.FullName;
56+
foreach (var subscription in Cache.Values)
57+
{
58+
subscription.Dispose();
59+
}
60+
61+
Cache.Clear();
5762
}
5863

59-
TimeSpan cacheFor;
60-
ISubscriptionStore inner;
61-
ConcurrentDictionary<string, CacheItem> Cache = new ConcurrentDictionary<string, CacheItem>();
64+
#pragma warning disable PS0018 // Clear should not be cancellable
65+
ValueTask Clear(string cacheKey) => Cache.TryGetValue(cacheKey, out var cachedSubscriptions) ? cachedSubscriptions.Clear() : ValueTask.CompletedTask;
66+
#pragma warning restore PS0018
6267

63-
class CacheItem
68+
static string CacheKey(Type eventType) => eventType.FullName;
69+
70+
readonly ConcurrentDictionary<string, CachedSubscriptions> Cache = new();
71+
72+
sealed class CachedSubscriptions(ISubscriptionStore store, Type eventType, TimeSpan cacheFor) : IDisposable
6473
{
65-
public DateTime StoredUtc { get; set; } // Internal usage, only set/get using private
66-
public Task<List<string>> Subscribers { get; set; }
74+
readonly SemaphoreSlim fetchSemaphore = new(1, 1);
75+
76+
List<string> cachedSubscriptions;
77+
long cachedAtTimestamp;
78+
79+
public async ValueTask<List<string>> EnsureFresh(CancellationToken cancellationToken = default)
80+
{
81+
var cachedSubscriptionsSnapshot = cachedSubscriptions;
82+
var cachedAtTimestampSnapshot = cachedAtTimestamp;
83+
84+
if (cachedSubscriptionsSnapshot != null && Stopwatch.GetElapsedTime(cachedAtTimestampSnapshot) < cacheFor)
85+
{
86+
return cachedSubscriptionsSnapshot;
87+
}
88+
89+
await fetchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
90+
91+
try
92+
{
93+
if (cachedSubscriptions != null && Stopwatch.GetElapsedTime(cachedAtTimestamp) < cacheFor)
94+
{
95+
return cachedSubscriptions;
96+
}
97+
98+
cachedSubscriptions = await store.GetSubscribers(eventType, cancellationToken).ConfigureAwait(false);
99+
cachedAtTimestamp = Stopwatch.GetTimestamp();
100+
101+
return cachedSubscriptions;
102+
}
103+
finally
104+
{
105+
fetchSemaphore.Release();
106+
}
107+
}
108+
109+
#pragma warning disable PS0018 // Clear should not be cancellable
110+
public async ValueTask Clear()
111+
#pragma warning restore PS0018
112+
{
113+
try
114+
{
115+
await fetchSemaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false);
116+
117+
cachedSubscriptions = null;
118+
cachedAtTimestamp = 0;
119+
}
120+
finally
121+
{
122+
fetchSemaphore.Release();
123+
}
124+
}
125+
126+
public void Dispose() => fetchSemaphore.Dispose();
67127
}
68128
}
69129
}

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ public async Task ChangeConcurrency(PushRuntimeSettings newLimitations,
125125

126126
public async Task StopReceive(CancellationToken cancellationToken = default)
127127
{
128+
if (messageReceivingCancellationTokenSource == null)
129+
{
130+
// already stopped or never started
131+
return;
132+
}
133+
128134
messageReceivingCancellationTokenSource?.Cancel();
129135

130136
using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
@@ -146,6 +152,7 @@ public async Task StopReceive(CancellationToken cancellationToken = default)
146152
messageProcessingCircuitBreaker.Dispose();
147153
concurrencyLimiter.Dispose();
148154
messageReceivingCancellationTokenSource?.Dispose();
155+
messageReceivingCancellationTokenSource = null;
149156
messageProcessingCancellationTokenSource?.Dispose();
150157
}
151158

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
namespace NServiceBus.Transport.SqlServer.UnitTests;
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NUnit.Framework;
8+
using Sql.Shared;
9+
10+
[TestFixture]
11+
public class CachedSubscriptionStoreTests
12+
{
13+
// Reproduces a bug that previously existed in the original implementation that stored tasks and their outcomes in the cache (see #1588)
14+
// leaving this test around to make sure such a problem doesn't occur again.
15+
[Test]
16+
public void Should_not_cache_cancelled_operations()
17+
{
18+
var subscriptionStore = new FakeSubscriptionStore
19+
{
20+
GetSubscribersAction = (_, token) => token.IsCancellationRequested ? Task.FromCanceled<List<string>>(token) : Task.FromResult(new List<string>())
21+
};
22+
23+
var cache = new CachedSubscriptionStore(subscriptionStore, TimeSpan.FromSeconds(60));
24+
25+
Assert.Multiple(() =>
26+
{
27+
_ = Assert.ThrowsAsync<TaskCanceledException>(async () =>
28+
await cache.GetSubscribers(typeof(object), new CancellationToken(true)));
29+
Assert.DoesNotThrowAsync(async () => await cache.GetSubscribers(typeof(object), CancellationToken.None));
30+
});
31+
}
32+
33+
class FakeSubscriptionStore : ISubscriptionStore
34+
{
35+
public Func<Type, CancellationToken, Task<List<string>>> GetSubscribersAction { get; set; } =
36+
(_, _) => Task.FromResult(new List<string>());
37+
38+
public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default) =>
39+
GetSubscribersAction(eventType, cancellationToken);
40+
41+
public Task Subscribe(string endpointName, string endpointAddress, Type eventType,
42+
CancellationToken cancellationToken = default) =>
43+
throw new NotImplementedException();
44+
45+
public Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default) =>
46+
throw new NotImplementedException();
47+
}
48+
}

src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,22 @@ public void ConfigureSendInfrastructure()
335335
connectionFactory);
336336
}
337337

338-
public override Task Shutdown(CancellationToken cancellationToken = default)
338+
public override async Task Shutdown(CancellationToken cancellationToken = default)
339339
{
340-
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
340+
try
341+
{
342+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
343+
.ConfigureAwait(false);
344+
345+
if (dueDelayedMessageProcessor != null)
346+
{
347+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
348+
}
349+
}
350+
finally
351+
{
352+
(subscriptionStore as IDisposable)?.Dispose();
353+
}
341354
}
342355

343356

0 commit comments

Comments
 (0)