Skip to content

Commit ee97e4c

Browse files
authored
Merge pull request #1556 from Particular/cherry-pick-1372-to-8.1
Fix race condition in RepeatedFailuresOverTimeCircuitBreaker
2 parents 53746f5 + de103b5 commit ee97e4c

File tree

8 files changed

+393
-47
lines changed

8 files changed

+393
-47
lines changed
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Weavers GenerateXsd="false">
3-
<Janitor />
43
<Obsolete />
54
</Weavers>

src/NServiceBus.Transport.PostgreSql/NServiceBus.Transport.PostgreSql.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
<ItemGroup>
1919
<PackageReference Include="Fody" Version="6.9.1" PrivateAssets="All" />
20-
<PackageReference Include="Janitor.Fody" Version="1.9.0" PrivateAssets="All" />
2120
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
2221
<PackageReference Include="Particular.Packaging" Version="4.1.0" PrivateAssets="All" />
2322
</ItemGroup>

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
193193

194194
// If either the receiving or processing circuit breakers are triggered, start only one message processing task at a time.
195195
var maximumConcurrentProcessing =
196-
messageProcessingCircuitBreaker.Triggered || messageReceivingCircuitBreaker.Triggered
196+
messageProcessingCircuitBreaker.IsTriggered || messageReceivingCircuitBreaker.IsTriggered
197197
? 1
198198
: messageCount;
199199

Lines changed: 166 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,210 @@
1-
namespace NServiceBus.Transport.Sql.Shared;
1+
#nullable enable
2+
3+
namespace NServiceBus.Transport.Sql.Shared;
24

35
using System;
46
using System.Threading;
57
using System.Threading.Tasks;
6-
using NServiceBus.Logging;
7-
8-
class RepeatedFailuresOverTimeCircuitBreaker : IDisposable
8+
using Logging;
9+
10+
/// <summary>
11+
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
12+
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
13+
/// changing other state when the circuit breaker is armed or disarmed.
14+
/// </summary>
15+
sealed class RepeatedFailuresOverTimeCircuitBreaker
916
{
10-
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action<Exception> triggerAction)
17+
/// <summary>
18+
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
19+
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
20+
/// changing other state when the circuit breaker is armed or disarmed.
21+
/// </summary>
22+
/// <param name="name">A name that is output in log messages when the circuit breaker changes states.</param>
23+
/// <param name="timeToWaitBeforeTriggering">The time to wait after the first failure before triggering.</param>
24+
/// <param name="triggerAction">The action to take when the circuit breaker is triggered.</param>
25+
/// <param name="armedAction">The action to execute on the first failure.
26+
/// <b>Warning:</b> This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
27+
/// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
28+
/// </param>
29+
/// <param name="disarmedAction">The action to execute when a success disarms the circuit breaker.
30+
/// <b>Warning:</b> This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
31+
/// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
32+
/// </param>
33+
/// <param name="timeToWaitWhenTriggered">How long to delay on each failure when in the Triggered state. Defaults to 10 seconds.</param>
34+
/// <param name="timeToWaitWhenArmed">How long to delay on each failure when in the Armed state. Defaults to 1 second.</param>
35+
/// <remarks>
36+
/// The <see cref="armedAction"/> and <see cref="disarmedAction"/> are invoked from within a lock to ensure that arming and disarming
37+
/// actions are serialized and do not execute concurrently. As a result, care must be taken to ensure that these actions do not
38+
/// introduce delays or deadlocks by performing lengthy operations or synchronously waiting on external resources.
39+
///
40+
/// <b>Best practice:</b> If the logic inside these actions involves blocking or long-running tasks, consider offloading
41+
/// the work to a background task or thread that doesn't hold the lock.
42+
/// </remarks>
43+
public RepeatedFailuresOverTimeCircuitBreaker(
44+
string name,
45+
TimeSpan timeToWaitBeforeTriggering,
46+
Action<Exception> triggerAction,
47+
Action? armedAction = null,
48+
Action? disarmedAction = null,
49+
TimeSpan? timeToWaitWhenTriggered = default,
50+
TimeSpan? timeToWaitWhenArmed = default)
1151
{
1252
this.name = name;
1353
this.triggerAction = triggerAction;
54+
this.armedAction = armedAction ?? (static () => { });
55+
this.disarmedAction = disarmedAction ?? (static () => { });
1456
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
57+
this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10);
58+
this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1);
1559

1660
timer = new Timer(CircuitBreakerTriggered);
1761
}
1862

19-
public bool Triggered => triggered;
20-
21-
public void Dispose()
22-
{
23-
//Injected
24-
}
25-
63+
/// <summary>
64+
/// Log a success, disarming the circuit breaker if it was previously armed.
65+
/// </summary>
2666
public void Success()
2767
{
28-
var oldValue = Interlocked.Exchange(ref failureCount, 0);
29-
30-
if (oldValue == 0)
68+
// Check the status of the circuit breaker, exiting early outside the lock if already disarmed
69+
if (Volatile.Read(ref circuitBreakerState) == Disarmed)
3170
{
3271
return;
3372
}
3473

35-
timer.Change(Timeout.Infinite, Timeout.Infinite);
36-
triggered = false;
37-
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
74+
lock (stateLock)
75+
{
76+
// Recheck state after obtaining the lock
77+
if (circuitBreakerState == Disarmed)
78+
{
79+
return;
80+
}
81+
82+
circuitBreakerState = Disarmed;
83+
84+
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
85+
Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed.", name);
86+
try
87+
{
88+
disarmedAction();
89+
}
90+
catch (Exception ex)
91+
{
92+
Logger.Error($"The circuit breaker for '{name}' was unable to execute the disarm action.", ex);
93+
throw;
94+
}
95+
}
3896
}
3997

98+
/// <summary>
99+
/// Log a failure, arming the circuit breaker if it was previously disarmed.
100+
/// </summary>
101+
/// <param name="exception">The exception that caused the failure.</param>
102+
/// <param name="cancellationToken">A cancellation token.</param>
40103
public Task Failure(Exception exception, CancellationToken cancellationToken = default)
41104
{
42-
lastException = exception;
43-
var newValue = Interlocked.Increment(ref failureCount);
105+
// Atomically store the exception that caused the circuit breaker to trip
106+
_ = Interlocked.Exchange(ref lastException, exception);
107+
108+
var previousState = Volatile.Read(ref circuitBreakerState);
109+
if (previousState is Armed or Triggered)
110+
{
111+
return Delay();
112+
}
44113

45-
if (newValue == 1)
114+
lock (stateLock)
46115
{
47-
timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
48-
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
116+
// Recheck state after obtaining the lock
117+
previousState = circuitBreakerState;
118+
if (previousState is Armed or Triggered)
119+
{
120+
return Delay();
121+
}
122+
123+
circuitBreakerState = Armed;
124+
125+
try
126+
{
127+
// Executing the action first before starting the timer to ensure that the action is executed before the timer fires
128+
// and the time of the action is not included in the time to wait before triggering.
129+
armedAction();
130+
}
131+
catch (Exception ex)
132+
{
133+
Logger.Error($"The circuit breaker for '{name}' was unable to execute the arm action.", new AggregateException(ex, exception));
134+
throw;
135+
}
136+
137+
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
138+
Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state due to '{1}' and might trigger in '{2}' when not disarmed.", name, exception, timeToWaitBeforeTriggering);
49139
}
50140

51-
var delay = Triggered ? ThrottledDelay : NonThrottledDelay;
52-
return Task.Delay(delay, cancellationToken);
141+
return Delay();
142+
143+
Task Delay()
144+
{
145+
var timeToWait = previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed;
146+
if (Logger.IsDebugEnabled)
147+
{
148+
Logger.DebugFormat("The circuit breaker for '{0}' is delaying the operation by '{1}'.", name, timeToWait);
149+
}
150+
return Task.Delay(timeToWait, cancellationToken);
151+
}
53152
}
54153

55-
void CircuitBreakerTriggered(object state)
154+
/// <summary>
155+
/// Disposes the resources associated with the circuit breaker.
156+
/// </summary>
157+
public void Dispose() => timer.Dispose();
158+
159+
void CircuitBreakerTriggered(object? state)
56160
{
57-
if (Interlocked.Read(ref failureCount) > 0)
161+
var previousState = Volatile.Read(ref circuitBreakerState);
162+
if (previousState == Disarmed)
163+
{
164+
return;
165+
}
166+
167+
lock (stateLock)
58168
{
59-
triggered = true;
60-
Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
169+
// Recheck state after obtaining the lock
170+
if (circuitBreakerState == Disarmed)
171+
{
172+
return;
173+
}
174+
175+
circuitBreakerState = Triggered;
176+
Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered with exception '{1}'.", name, lastException);
61177

62178
try
63179
{
64-
triggerAction(lastException);
180+
triggerAction(lastException!);
65181
}
66182
catch (Exception ex)
67183
{
68-
Logger.Error($"Error invoking trigger action for circuit breaker {name}", ex);
184+
Logger.Fatal($"The circuit breaker for '{name}' was unable to execute the trigger action.", new AggregateException(ex, lastException!));
69185
}
70186
}
71187
}
72188

189+
public bool IsTriggered => circuitBreakerState == Triggered;
190+
191+
int circuitBreakerState = Disarmed;
192+
Exception? lastException;
193+
194+
readonly string name;
195+
readonly Timer timer;
196+
readonly TimeSpan timeToWaitBeforeTriggering;
197+
readonly Action<Exception> triggerAction;
198+
readonly Action armedAction;
199+
readonly Action disarmedAction;
200+
readonly TimeSpan timeToWaitWhenTriggered;
201+
readonly TimeSpan timeToWaitWhenArmed;
202+
readonly object stateLock = new();
73203

74-
string name;
75-
TimeSpan timeToWaitBeforeTriggering;
76-
Timer timer;
77-
Action<Exception> triggerAction;
78-
long failureCount;
79-
Exception lastException;
80-
volatile bool triggered;
204+
const int Disarmed = 0;
205+
const int Armed = 1;
206+
const int Triggered = 2;
81207

82-
static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
83-
static ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
84-
static TimeSpan NonThrottledDelay = TimeSpan.FromSeconds(1);
85-
static TimeSpan ThrottledDelay = TimeSpan.FromSeconds(10);
208+
static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
209+
static readonly ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
86210
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
[*.cs]
22

3+
# Justification: Test project
4+
dotnet_diagnostic.CA2007.severity = none
5+
36
# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
47
dotnet_diagnostic.NSB0002.severity = suggestion

0 commit comments

Comments
 (0)