Skip to content

Commit 13b2a48

Browse files
Avoid cancellation token cross-contamination and thundering herd in subscription cache (#1591)
* Avoid cancellation token cross-contamination and thundering herd in subscription cache (#1587) * Primary ctor * Fix problematic caching in CachedSubscriptionStore * Fix race around semaphore * Clear * Use monotonic stopwatch * Use the lease structure consistently * Get rid of the sugar * Get rid of the sugar * EnsureFresh semaphore acquiring can be cancelled * Adding unit test that reproduced the bug on master --------- Co-authored-by: Daniel Marbach <[email protected]> * Fix Stop * Fix variable name --------- Co-authored-by: Daniel Marbach <[email protected]>
1 parent f99254e commit 13b2a48

File tree

6 files changed

+194
-42
lines changed

6 files changed

+194
-42
lines changed

src/NServiceBus.Transport.SqlServer.UnitTests/.editorconfig

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
[*.cs]
22

3+
# Justification: Test project
4+
dotnet_diagnostic.CA2007.severity = none
5+
36
# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
47
dotnet_diagnostic.NSB0002.severity = suggestion
58

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
namespace NServiceBus.Transport.SqlServer.UnitTests
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NUnit.Framework;
8+
9+
[TestFixture]
10+
public class CachedSubscriptionStoreTests
11+
{
12+
// Reproduces a bug that previously existed in the original implementation that stored tasks and their outcomes in the cache (see #1588)
13+
// leaving this test around to make sure such a problem doesn't occur again.
14+
[Test]
15+
public void Should_not_cache_cancelled_operations()
16+
{
17+
var subscriptionStore = new FakeSubscriptionStore
18+
{
19+
GetSubscribersAction = (_, token) => token.IsCancellationRequested ? Task.FromCanceled<List<string>>(token) : Task.FromResult(new List<string>())
20+
};
21+
22+
var cache = new CachedSubscriptionStore(subscriptionStore, TimeSpan.FromSeconds(60));
23+
24+
Assert.Multiple(() =>
25+
{
26+
_ = Assert.ThrowsAsync<TaskCanceledException>(async () =>
27+
await cache.GetSubscribers(typeof(object), new CancellationToken(true)));
28+
Assert.DoesNotThrowAsync(async () => await cache.GetSubscribers(typeof(object), CancellationToken.None));
29+
});
30+
}
31+
32+
class FakeSubscriptionStore : ISubscriptionStore
33+
{
34+
public Func<Type, CancellationToken, Task<List<string>>> GetSubscribersAction { get; set; } =
35+
(type, token) => Task.FromResult(new List<string>());
36+
37+
public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default) =>
38+
GetSubscribersAction(eventType, cancellationToken);
39+
40+
public Task Subscribe(string endpointName, string endpointAddress, Type eventType,
41+
CancellationToken cancellationToken = default) =>
42+
throw new NotImplementedException();
43+
44+
public Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default) =>
45+
throw new NotImplementedException();
46+
}
47+
}
48+
}

src/NServiceBus.Transport.SqlServer.UnitTests/Receiving/RepeatedFailuresOverTimeCircuitBreakerTests.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ namespace NServiceBus.Transport.SqlServer.UnitTests.Receiving
88
using NUnit.Framework;
99
using NServiceBus.Transport.SqlServer;
1010

11-
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
12-
1311
// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
1412
// and we want keep the changes to a minimum for now to allow backporting to older versions.
1513
[TestFixture]
@@ -222,5 +220,4 @@ public async Task Should_trigger_after_multiple_failures_and_timeout()
222220
Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
223221
}
224222
}
225-
}
226-
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
223+
}

src/NServiceBus.Transport.SqlServer/PubSub/CachedSubscriptionStore.cs

Lines changed: 120 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,67 +3,151 @@ namespace NServiceBus.Transport.SqlServer
33
using System;
44
using System.Collections.Concurrent;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Threading;
78
using System.Threading.Tasks;
89

910

10-
class CachedSubscriptionStore : ISubscriptionStore
11+
sealed class CachedSubscriptionStore : ISubscriptionStore, IDisposable
1112
{
12-
public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor)
13+
public async Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
1314
{
14-
this.inner = inner;
15-
this.cacheFor = cacheFor;
15+
var cacheKey = CacheKey(eventType);
16+
var cachedSubscriptions = Cache.GetOrAdd(cacheKey,
17+
static (_, state) => new CachedSubscriptions(state.inner, state.eventType, state.cacheFor),
18+
(inner, eventType, cacheFor));
19+
20+
return await cachedSubscriptions.EnsureFresh(cancellationToken).ConfigureAwait(false);
1621
}
1722

18-
public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
23+
public async Task Subscribe(string endpointName, string endpointAddress, Type eventType, CancellationToken cancellationToken = default)
1924
{
20-
var cacheItem = Cache.GetOrAdd(CacheKey(eventType),
21-
_ => new CacheItem
22-
{
23-
StoredUtc = DateTime.UtcNow,
24-
Subscribers = inner.GetSubscribers(eventType, cancellationToken)
25-
});
26-
27-
var age = DateTime.UtcNow - cacheItem.StoredUtc;
28-
if (age >= cacheFor)
25+
try
2926
{
30-
cacheItem.Subscribers = inner.GetSubscribers(eventType, cancellationToken);
31-
cacheItem.StoredUtc = DateTime.UtcNow;
27+
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
28+
}
29+
finally
30+
{
31+
await Clear(CacheKey(eventType))
32+
.ConfigureAwait(false);
3233
}
33-
34-
return cacheItem.Subscribers;
3534
}
3635

37-
public async Task Subscribe(string endpointName, string endpointAddress, Type eventType, CancellationToken cancellationToken = default)
36+
public async Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default)
3837
{
39-
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
40-
ClearForMessageType(CacheKey(eventType));
38+
try
39+
{
40+
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
41+
}
42+
finally
43+
{
44+
await Clear(CacheKey(eventType))
45+
.ConfigureAwait(false);
46+
}
4147
}
4248

43-
public async Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default)
49+
public void Dispose()
4450
{
45-
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
46-
ClearForMessageType(CacheKey(eventType));
51+
if (Cache.IsEmpty)
52+
{
53+
return;
54+
}
55+
56+
foreach (var subscription in Cache.Values)
57+
{
58+
subscription.Dispose();
59+
}
60+
61+
Cache.Clear();
4762
}
4863

49-
void ClearForMessageType(string topic)
64+
#pragma warning disable PS0018 // Clear should not be cancellable
65+
ValueTask Clear(string cacheKey) => Cache.TryGetValue(cacheKey, out var cachedSubscriptions) ? cachedSubscriptions.Clear() : default;
66+
#pragma warning restore PS0018
67+
68+
static string CacheKey(Type eventType) => eventType.FullName;
69+
70+
readonly ConcurrentDictionary<string, CachedSubscriptions> Cache = new();
71+
readonly ISubscriptionStore inner;
72+
readonly TimeSpan cacheFor;
73+
74+
public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor)
5075
{
51-
Cache.TryRemove(topic, out _);
76+
this.inner = inner;
77+
this.cacheFor = cacheFor;
5278
}
5379

54-
static string CacheKey(Type eventType)
80+
sealed class CachedSubscriptions : IDisposable
5581
{
56-
return eventType.FullName;
57-
}
82+
readonly SemaphoreSlim fetchSemaphore = new(1, 1);
5883

59-
TimeSpan cacheFor;
60-
ISubscriptionStore inner;
61-
ConcurrentDictionary<string, CacheItem> Cache = new ConcurrentDictionary<string, CacheItem>();
84+
List<string> cachedSubscriptions;
85+
long cachedAtTimestamp;
86+
readonly ISubscriptionStore store;
87+
readonly Type eventType;
88+
readonly TimeSpan cacheFor;
6289

63-
class CacheItem
64-
{
65-
public DateTime StoredUtc { get; set; } // Internal usage, only set/get using private
66-
public Task<List<string>> Subscribers { get; set; }
90+
public CachedSubscriptions(ISubscriptionStore store, Type eventType, TimeSpan cacheFor)
91+
{
92+
this.store = store;
93+
this.eventType = eventType;
94+
this.cacheFor = cacheFor;
95+
}
96+
97+
public async ValueTask<List<string>> EnsureFresh(CancellationToken cancellationToken = default)
98+
{
99+
var cachedSubscriptionsSnapshot = cachedSubscriptions;
100+
var cachedAtTimestampSnapshot = cachedAtTimestamp;
101+
102+
if (cachedSubscriptionsSnapshot != null && GetElapsedTime(cachedAtTimestampSnapshot) < cacheFor)
103+
{
104+
return cachedSubscriptionsSnapshot;
105+
}
106+
107+
await fetchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
108+
109+
try
110+
{
111+
if (cachedSubscriptions != null && GetElapsedTime(cachedAtTimestamp) < cacheFor)
112+
{
113+
return cachedSubscriptions;
114+
}
115+
116+
cachedSubscriptions = await store.GetSubscribers(eventType, cancellationToken).ConfigureAwait(false);
117+
cachedAtTimestamp = Stopwatch.GetTimestamp();
118+
119+
return cachedSubscriptions;
120+
}
121+
finally
122+
{
123+
fetchSemaphore.Release();
124+
}
125+
}
126+
127+
static readonly double tickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;
128+
static TimeSpan GetElapsedTime(long startingTimestamp) =>
129+
GetElapsedTime(startingTimestamp, Stopwatch.GetTimestamp());
130+
static TimeSpan GetElapsedTime(long startingTimestamp, long endingTimestamp) =>
131+
new((long)((endingTimestamp - startingTimestamp) * tickFrequency));
132+
133+
#pragma warning disable PS0018 // Clear should not be cancellable
134+
public async ValueTask Clear()
135+
#pragma warning restore PS0018
136+
{
137+
try
138+
{
139+
await fetchSemaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false);
140+
141+
cachedSubscriptions = null;
142+
cachedAtTimestamp = 0;
143+
}
144+
finally
145+
{
146+
fetchSemaphore.Release();
147+
}
148+
}
149+
150+
public void Dispose() => fetchSemaphore.Dispose();
67151
}
68152
}
69153
}

src/NServiceBus.Transport.SqlServer/Receiving/MessageReceiver.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ public async Task ChangeConcurrency(PushRuntimeSettings newLimitations, Cancella
124124

125125
public async Task StopReceive(CancellationToken cancellationToken = default)
126126
{
127+
if (messageReceivingCancellationTokenSource == null)
128+
{
129+
// already stopped or never started
130+
return;
131+
}
132+
127133
messageReceivingCancellationTokenSource?.Cancel();
128134

129135
using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
@@ -145,6 +151,7 @@ public async Task StopReceive(CancellationToken cancellationToken = default)
145151
messageProcessingCircuitBreaker.Dispose();
146152
concurrencyLimiter.Dispose();
147153
messageReceivingCancellationTokenSource?.Dispose();
154+
messageReceivingCancellationTokenSource = null;
148155
messageProcessingCancellationTokenSource?.Dispose();
149156
}
150157

src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,22 @@ public void ConfigureSendInfrastructure()
295295
connectionFactory);
296296
}
297297

298-
public override Task Shutdown(CancellationToken cancellationToken = default)
298+
public override async Task Shutdown(CancellationToken cancellationToken = default)
299299
{
300-
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
300+
try
301+
{
302+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
303+
.ConfigureAwait(false);
304+
305+
if (dueDelayedMessageProcessor != null)
306+
{
307+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
308+
}
309+
}
310+
finally
311+
{
312+
(subscriptionStore as IDisposable)?.Dispose();
313+
}
301314
}
302315

303316
#pragma warning disable CS0618 // Type or member is obsolete

0 commit comments

Comments
 (0)