Skip to content

Commit 2c350db

Browse files
committed
Fix race condition in RepeatedFailuresOverTimeCircuitBreaker
1 parent 0528800 commit 2c350db

File tree

3 files changed

+384
-53
lines changed

3 files changed

+384
-53
lines changed

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ public async Task StopReceive(CancellationToken cancellationToken = default)
148148
}
149149
}
150150

151-
messageReceivingCircuitBreaker.Dispose();
152-
messageProcessingCircuitBreaker.Dispose();
153151
concurrencyLimiter.Dispose();
154152
messageReceivingCancellationTokenSource.Dispose();
155153
messageReceivingCancellationTokenSource = null;
@@ -200,7 +198,7 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
200198

201199
// If either the receiving or processing circuit breakers are triggered, start only one message processing task at a time.
202200
var maximumConcurrentProcessing =
203-
messageProcessingCircuitBreaker.Triggered || messageReceivingCircuitBreaker.Triggered
201+
messageProcessingCircuitBreaker.IsTriggered || messageReceivingCircuitBreaker.IsTriggered
204202
? 1
205203
: messageCount;
206204

src/NServiceBus.Transport.Sql.Shared/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs

Lines changed: 157 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,99 +3,206 @@
33
using System;
44
using System.Threading;
55
using System.Threading.Tasks;
6-
using NServiceBus.Logging;
7-
8-
class RepeatedFailuresOverTimeCircuitBreaker : IDisposable
6+
using Logging;
7+
8+
/// <summary>
9+
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
10+
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
11+
/// changing other state when the circuit breaker is armed or disarmed.
12+
/// </summary>
13+
sealed class RepeatedFailuresOverTimeCircuitBreaker
914
{
10-
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action<Exception> triggerAction)
15+
/// <summary>
16+
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
17+
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
18+
/// changing other state when the circuit breaker is armed or disarmed.
19+
/// </summary>
20+
/// <param name="name">A name that is output in log messages when the circuit breaker changes states.</param>
21+
/// <param name="timeToWaitBeforeTriggering">The time to wait after the first failure before triggering.</param>
22+
/// <param name="triggerAction">The action to take when the circuit breaker is triggered.</param>
23+
/// <param name="armedAction">The action to execute on the first failure.
24+
/// <b>Warning:</b> This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
25+
/// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
26+
/// </param>
27+
/// <param name="disarmedAction">The action to execute when a success disarms the circuit breaker.
28+
/// <b>Warning:</b> This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
29+
/// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
30+
/// </param>
31+
/// <param name="timeToWaitWhenTriggered">How long to delay on each failure when in the Triggered state. Defaults to 10 seconds.</param>
32+
/// <param name="timeToWaitWhenArmed">How long to delay on each failure when in the Armed state. Defaults to 1 second.</param>
33+
/// <remarks>
34+
/// The <see cref="armedAction"/> and <see cref="disarmedAction"/> are invoked from within a lock to ensure that arming and disarming
35+
/// actions are serialized and do not execute concurrently. As a result, care must be taken to ensure that these actions do not
36+
/// introduce delays or deadlocks by performing lengthy operations or synchronously waiting on external resources.
37+
///
38+
/// <b>Best practice:</b> If the logic inside these actions involves blocking or long-running tasks, consider offloading
39+
/// the work to a background task or thread that doesn't hold the lock.
40+
/// </remarks>
41+
public RepeatedFailuresOverTimeCircuitBreaker(
42+
string name,
43+
TimeSpan timeToWaitBeforeTriggering,
44+
Action<Exception> triggerAction,
45+
Action armedAction = null,
46+
Action disarmedAction = null,
47+
TimeSpan timeToWaitWhenTriggered = default,
48+
TimeSpan timeToWaitWhenArmed = default)
1149
{
1250
this.name = name;
1351
this.triggerAction = triggerAction;
52+
this.armedAction = armedAction ?? (static () => { });
53+
this.disarmedAction = disarmedAction ?? (static () => { });
1454
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
55+
this.timeToWaitWhenTriggered = timeToWaitWhenTriggered == TimeSpan.MinValue ? TimeSpan.FromSeconds(10) : timeToWaitWhenTriggered;
56+
this.timeToWaitWhenArmed = timeToWaitWhenArmed == TimeSpan.MinValue ? TimeSpan.FromSeconds(1) : timeToWaitWhenArmed;
1557

1658
timer = new Timer(CircuitBreakerTriggered);
1759
}
1860

19-
public bool Triggered => triggered;
20-
61+
/// <summary>
62+
/// Log a success, disarming the circuit breaker if it was previously armed.
63+
/// </summary>
2164
public void Success()
2265
{
23-
var oldValue = Interlocked.Exchange(ref failureCount, 0);
24-
25-
if (oldValue == 0)
66+
// Check the status of the circuit breaker, exiting early outside the lock if already disarmed
67+
if (Volatile.Read(ref circuitBreakerState) == Disarmed)
2668
{
2769
return;
2870
}
2971

30-
timer.Change(Timeout.Infinite, Timeout.Infinite);
31-
triggered = false;
32-
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
72+
lock (stateLock)
73+
{
74+
// Recheck state after obtaining the lock
75+
if (circuitBreakerState == Disarmed)
76+
{
77+
return;
78+
}
79+
80+
circuitBreakerState = Disarmed;
81+
82+
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
83+
Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed.", name);
84+
try
85+
{
86+
disarmedAction();
87+
}
88+
catch (Exception ex)
89+
{
90+
Logger.Error($"The circuit breaker for '{name}' was unable to execute the disarm action.", ex);
91+
throw;
92+
}
93+
}
3394
}
3495

96+
/// <summary>
97+
/// Log a failure, arming the circuit breaker if it was previously disarmed.
98+
/// </summary>
99+
/// <param name="exception">The exception that caused the failure.</param>
100+
/// <param name="cancellationToken">A cancellation token.</param>
35101
public Task Failure(Exception exception, CancellationToken cancellationToken = default)
36102
{
37-
lastException = exception;
38-
var newValue = Interlocked.Increment(ref failureCount);
103+
// Atomically store the exception that caused the circuit breaker to trip
104+
_ = Interlocked.Exchange(ref lastException, exception);
39105

40-
if (newValue == 1)
106+
var previousState = Volatile.Read(ref circuitBreakerState);
107+
if (previousState is Armed or Triggered)
41108
{
42-
timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
43-
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
109+
return Delay();
44110
}
45111

46-
var delay = Triggered ? ThrottledDelay : NonThrottledDelay;
47-
return Task.Delay(delay, cancellationToken);
48-
}
49-
50-
void CircuitBreakerTriggered(object state)
51-
{
52-
if (Interlocked.Read(ref failureCount) > 0)
112+
lock (stateLock)
53113
{
54-
triggered = true;
55-
Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
114+
// Recheck state after obtaining the lock
115+
previousState = circuitBreakerState;
116+
if (previousState is Armed or Triggered)
117+
{
118+
return Delay();
119+
}
120+
121+
circuitBreakerState = Armed;
56122

57123
try
58124
{
59-
triggerAction(lastException);
125+
// Executing the action first before starting the timer to ensure that the action is executed before the timer fires
126+
// and the time of the action is not included in the time to wait before triggering.
127+
armedAction();
60128
}
61129
catch (Exception ex)
62130
{
63-
Logger.Error($"Error invoking trigger action for circuit breaker {name}", ex);
131+
Logger.Error($"The circuit breaker for '{name}' was unable to execute the arm action.", new AggregateException(ex, exception));
132+
throw;
64133
}
134+
135+
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
136+
Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state due to '{1}' and might trigger in '{2}' when not disarmed.", name, exception, timeToWaitBeforeTriggering);
137+
}
138+
139+
return Delay();
140+
141+
Task Delay()
142+
{
143+
var timeToWait = previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed;
144+
if (Logger.IsDebugEnabled)
145+
{
146+
Logger.DebugFormat("The circuit breaker for '{0}' is delaying the operation by '{1}'.", name, timeToWait);
147+
}
148+
return Task.Delay(timeToWait, cancellationToken);
65149
}
66150
}
67151

68-
protected virtual void Dispose(bool disposing)
152+
/// <summary>
153+
/// Disposes the resources associated with the circuit breaker.
154+
/// </summary>
155+
public void Dispose() => timer.Dispose();
156+
157+
void CircuitBreakerTriggered(object state)
69158
{
70-
if (!disposed)
159+
var previousState = Volatile.Read(ref circuitBreakerState);
160+
if (previousState == Disarmed)
161+
{
162+
return;
163+
}
164+
165+
lock (stateLock)
71166
{
72-
if (disposing)
167+
// Recheck state after obtaining the lock
168+
if (circuitBreakerState == Disarmed)
73169
{
74-
timer?.Dispose();
170+
return;
75171
}
76172

77-
disposed = true;
173+
circuitBreakerState = Triggered;
174+
Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered with exception '{1}'.", name, lastException);
175+
176+
try
177+
{
178+
triggerAction(lastException!);
179+
}
180+
catch (Exception ex)
181+
{
182+
Logger.Fatal($"The circuit breaker for '{name}' was unable to execute the trigger action.", new AggregateException(ex, lastException!));
183+
}
78184
}
79185
}
80186

81-
public void Dispose()
82-
{
83-
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
84-
Dispose(disposing: true);
85-
GC.SuppressFinalize(this);
86-
}
187+
public bool IsTriggered => circuitBreakerState == Triggered;
87188

88-
string name;
89-
TimeSpan timeToWaitBeforeTriggering;
90-
Timer timer;
91-
Action<Exception> triggerAction;
92-
long failureCount;
189+
int circuitBreakerState = Disarmed;
93190
Exception lastException;
94-
volatile bool triggered;
95-
bool disposed;
96191

97-
static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
98-
static ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
99-
static TimeSpan NonThrottledDelay = TimeSpan.FromSeconds(1);
100-
static TimeSpan ThrottledDelay = TimeSpan.FromSeconds(10);
192+
readonly string name;
193+
readonly Timer timer;
194+
readonly TimeSpan timeToWaitBeforeTriggering;
195+
readonly Action<Exception> triggerAction;
196+
readonly Action armedAction;
197+
readonly Action disarmedAction;
198+
readonly TimeSpan timeToWaitWhenTriggered;
199+
readonly TimeSpan timeToWaitWhenArmed;
200+
readonly object stateLock = new();
201+
202+
const int Disarmed = 0;
203+
const int Armed = 1;
204+
const int Triggered = 2;
205+
206+
static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
207+
static readonly ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
101208
}

0 commit comments

Comments
 (0)