Skip to content

Commit c842599

Browse files
authored
Merge pull request #1557 from Particular/cherry-pick-1372-to-7.0
Fix race condition in RepeatedFailuresOverTimeCircuitBreaker
2 parents 6e46db8 + 2f404eb commit c842599

File tree

9 files changed

+410
-62
lines changed

9 files changed

+410
-62
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
include:
2121
- os: windows-2019
2222
name: Windows
23-
- os: ubuntu-20.04
23+
- os: ubuntu-22.04
2424
name: Linux
2525
fail-fast: false
2626
steps:

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defaults:
1111
shell: pwsh
1212
jobs:
1313
release:
14-
runs-on: ubuntu-20.04
14+
runs-on: ubuntu-22.04
1515
steps:
1616
- name: Checkout
1717
uses: actions/[email protected]
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Weavers GenerateXsd="false">
3-
<Janitor />
43
<Obsolete />
54
</Weavers>

src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
<ItemGroup>
2525
<PackageReference Include="Fody" Version="6.9.1" PrivateAssets="All" />
26-
<PackageReference Include="Janitor.Fody" Version="1.9.0" PrivateAssets="All" />
2726
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
2827
<PackageReference Include="Particular.Packaging" Version="4.2.0" PrivateAssets="All" />
2928
</ItemGroup>
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
namespace NServiceBus.Transport.SqlServer.UnitTests.Receiving
2+
{
3+
using System;
4+
using System.Diagnostics;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using NUnit.Framework;
9+
using NServiceBus.Transport.SqlServer;
10+
11+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
12+
13+
// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
14+
// and we want keep the changes to a minimum for now to allow backporting to older versions.
15+
[TestFixture]
16+
public class RepeatedFailuresOverTimeCircuitBreakerTests
17+
{
18+
[Test]
19+
public async Task Should_disarm_on_success()
20+
{
21+
var armedActionCalled = false;
22+
var disarmedActionCalled = false;
23+
24+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
25+
"TestCircuitBreaker",
26+
TimeSpan.FromMilliseconds(100),
27+
ex => { },
28+
() => armedActionCalled = true,
29+
() => disarmedActionCalled = true,
30+
TimeSpan.Zero,
31+
TimeSpan.Zero
32+
);
33+
34+
await circuitBreaker.Failure(new Exception("Test Exception"));
35+
circuitBreaker.Success();
36+
37+
Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
38+
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
39+
}
40+
41+
[Test]
42+
public async Task Should_rethrow_exception_on_success()
43+
{
44+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
45+
"TestCircuitBreaker",
46+
TimeSpan.FromMilliseconds(100),
47+
_ => { },
48+
() => { },
49+
() => throw new Exception("Exception from disarmed action"),
50+
timeToWaitWhenTriggered: TimeSpan.Zero,
51+
timeToWaitWhenArmed: TimeSpan.Zero
52+
);
53+
54+
await circuitBreaker.Failure(new Exception("Test Exception"));
55+
56+
var ex = Assert.Throws<Exception>(() => circuitBreaker.Success());
57+
Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
58+
}
59+
60+
[Test]
61+
public async Task Should_trigger_after_failure_timeout()
62+
{
63+
var triggerActionCalled = false;
64+
Exception lastTriggerException = null;
65+
66+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
67+
"TestCircuitBreaker",
68+
TimeSpan.Zero,
69+
ex => { triggerActionCalled = true; lastTriggerException = ex; },
70+
timeToWaitWhenTriggered: TimeSpan.Zero,
71+
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
72+
);
73+
74+
await circuitBreaker.Failure(new Exception("Test Exception"));
75+
76+
Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
77+
Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
78+
}
79+
80+
[Test]
81+
public void Should_rethrow_exception_on_failure()
82+
{
83+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
84+
"TestCircuitBreaker",
85+
TimeSpan.FromMilliseconds(100),
86+
_ => { },
87+
() => throw new Exception("Exception from armed action"),
88+
() => { },
89+
timeToWaitWhenTriggered: TimeSpan.Zero,
90+
timeToWaitWhenArmed: TimeSpan.Zero
91+
);
92+
93+
var ex = Assert.ThrowsAsync<Exception>(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
94+
Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
95+
}
96+
97+
[Test]
98+
public async Task Should_delay_after_trigger_failure()
99+
{
100+
var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
101+
var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);
102+
103+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
104+
"TestCircuitBreaker",
105+
TimeSpan.Zero,
106+
_ => { },
107+
timeToWaitWhenTriggered: timeToWaitWhenTriggered,
108+
timeToWaitWhenArmed: timeToWaitWhenArmed
109+
);
110+
111+
var stopWatch = Stopwatch.StartNew();
112+
113+
await circuitBreaker.Failure(new Exception("Test Exception"));
114+
await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));
115+
116+
stopWatch.Stop();
117+
118+
Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
119+
}
120+
121+
[Test]
122+
public async Task Should_not_trigger_if_disarmed_before_timeout()
123+
{
124+
var triggerActionCalled = false;
125+
126+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
127+
"TestCircuitBreaker",
128+
TimeSpan.FromMilliseconds(100),
129+
ex => triggerActionCalled = true,
130+
timeToWaitWhenTriggered: TimeSpan.Zero,
131+
timeToWaitWhenArmed: TimeSpan.Zero
132+
);
133+
134+
await circuitBreaker.Failure(new Exception("Test Exception"));
135+
circuitBreaker.Success();
136+
137+
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
138+
}
139+
140+
[Test]
141+
public async Task Should_handle_concurrent_failure_and_success()
142+
{
143+
var armedActionCalled = false;
144+
var disarmedActionCalled = false;
145+
var triggerActionCalled = false;
146+
147+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
148+
"TestCircuitBreaker",
149+
TimeSpan.FromMilliseconds(100),
150+
ex => triggerActionCalled = true,
151+
() => armedActionCalled = true,
152+
() => disarmedActionCalled = true,
153+
TimeSpan.Zero,
154+
TimeSpan.Zero
155+
);
156+
157+
var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
158+
var successTask = Task.Run(() =>
159+
{
160+
Thread.Sleep(50); // Simulate some delay before success
161+
circuitBreaker.Success();
162+
});
163+
164+
await Task.WhenAll(failureTask, successTask);
165+
166+
Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
167+
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
168+
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
169+
}
170+
171+
[Test]
172+
public async Task Should_handle_high_concurrent_failure_and_success()
173+
{
174+
var armedActionCalled = 0;
175+
var disarmedActionCalled = 0;
176+
var triggerActionCalled = 0;
177+
178+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
179+
"TestCircuitBreaker",
180+
TimeSpan.FromSeconds(5),
181+
ex => Interlocked.Increment(ref triggerActionCalled),
182+
() => Interlocked.Increment(ref armedActionCalled),
183+
() => Interlocked.Increment(ref disarmedActionCalled),
184+
TimeSpan.Zero,
185+
TimeSpan.FromMilliseconds(25)
186+
);
187+
188+
var tasks = Enumerable.Range(0, 1000)
189+
.Select(
190+
i => i % 2 == 0 ?
191+
circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
192+
Task.Run(() =>
193+
{
194+
Thread.Sleep(25); // Simulate some delay before success
195+
circuitBreaker.Success();
196+
})
197+
).ToArray();
198+
199+
await Task.WhenAll(tasks);
200+
201+
Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
202+
Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
203+
Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
204+
}
205+
206+
[Test]
207+
public async Task Should_trigger_after_multiple_failures_and_timeout()
208+
{
209+
var triggerActionCalled = false;
210+
211+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
212+
"TestCircuitBreaker",
213+
TimeSpan.FromMilliseconds(50),
214+
ex => triggerActionCalled = true,
215+
timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
216+
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
217+
);
218+
219+
await circuitBreaker.Failure(new Exception("Test Exception"));
220+
await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));
221+
222+
Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
223+
}
224+
}
225+
}
226+
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Weavers GenerateXsd="false">
3-
<Janitor />
43
<Obsolete />
54
</Weavers>

src/NServiceBus.Transport.SqlServer/NServiceBus.Transport.SqlServer.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
<ItemGroup>
2020
<PackageReference Include="Fody" Version="6.9.1" PrivateAssets="All" />
21-
<PackageReference Include="Janitor.Fody" Version="1.9.0" PrivateAssets="All" />
2221
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
2322
<PackageReference Include="Particular.Packaging" Version="4.2.0" PrivateAssets="All" />
2423
</ItemGroup>

src/NServiceBus.Transport.SqlServer/Receiving/MessageReceiver.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,10 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
188188
var stopBatchCancellationSource = new CancellationTokenSource();
189189

190190
// If either the receiving or processing circuit breakers are triggered, start only one message processing task at a time.
191-
var maximumConcurrentProcessing = messageProcessingCircuitBreaker.Triggered || messageReceivingCircuitBreaker.Triggered ? 1 : messageCount;
191+
var maximumConcurrentProcessing =
192+
messageProcessingCircuitBreaker.IsTriggered || messageReceivingCircuitBreaker.IsTriggered
193+
? 1
194+
: messageCount;
192195

193196
for (var i = 0; i < maximumConcurrentProcessing; i++)
194197
{

0 commit comments

Comments
 (0)