Skip to content

Commit 2a7875b

Browse files
authored
Fix race condition in RepeatedFailuresOverTimeCircuitBreaker (#5015)
1 parent a50af24 commit 2a7875b

File tree

5 files changed

+432
-154
lines changed

5 files changed

+432
-154
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#nullable enable
2+
3+
namespace NServiceBus;
4+
5+
using System;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Logging;
9+
10+
/// <summary>
11+
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
12+
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
13+
/// changing other state when the circuit breaker is armed or disarmed.
14+
/// </summary>
15+
public sealed class RepeatedFailuresOverTimeCircuitBreaker
16+
{
17+
/// <summary>
18+
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
19+
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
20+
/// changing other state when the circuit breaker is armed or disarmed.
21+
/// </summary>
22+
/// <param name="name">A name that is output in log messages when the circuit breaker changes states.</param>
23+
/// <param name="timeToWaitBeforeTriggering">The time to wait after the first failure before triggering.</param>
24+
/// <param name="triggerAction">The action to take when the circuit breaker is triggered.</param>
25+
/// <param name="armedAction">The action to execute on the first failure.
26+
/// <b>Warning:</b> This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
27+
/// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
28+
/// </param>
29+
/// <param name="disarmedAction">The action to execute when a success disarms the circuit breaker.
30+
/// <b>Warning:</b> This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
31+
/// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
32+
/// </param>
33+
/// <param name="timeToWaitWhenTriggered">How long to delay on each failure when in the Triggered state. Defaults to 10 seconds.</param>
34+
/// <param name="timeToWaitWhenArmed">How long to delay on each failure when in the Armed state. Defaults to 1 second.</param>
35+
/// <remarks>
36+
/// The <see cref="armedAction"/> and <see cref="disarmedAction"/> are invoked from within a lock to ensure that arming and disarming
37+
/// actions are serialized and do not execute concurrently. As a result, care must be taken to ensure that these actions do not
38+
/// introduce delays or deadlocks by performing lengthy operations or synchronously waiting on external resources.
39+
///
40+
/// <b>Best practice:</b> If the logic inside these actions involves blocking or long-running tasks, consider offloading
41+
/// the work to a background task or thread that doesn't hold the lock.
42+
/// </remarks>
43+
public RepeatedFailuresOverTimeCircuitBreaker(
44+
string name,
45+
TimeSpan timeToWaitBeforeTriggering,
46+
Action<Exception> triggerAction,
47+
Action? armedAction = null,
48+
Action? disarmedAction = null,
49+
TimeSpan? timeToWaitWhenTriggered = default,
50+
TimeSpan? timeToWaitWhenArmed = default)
51+
{
52+
this.name = name;
53+
this.triggerAction = triggerAction;
54+
this.armedAction = armedAction ?? (static () => { });
55+
this.disarmedAction = disarmedAction ?? (static () => { });
56+
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
57+
this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10);
58+
this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1);
59+
60+
timer = new Timer(CircuitBreakerTriggered);
61+
}
62+
63+
/// <summary>
64+
/// Log a success, disarming the circuit breaker if it was previously armed.
65+
/// </summary>
66+
public void Success()
67+
{
68+
// Check the status of the circuit breaker, exiting early outside the lock if already disarmed
69+
if (Volatile.Read(ref circuitBreakerState) == Disarmed)
70+
{
71+
return;
72+
}
73+
74+
lock (stateLock)
75+
{
76+
// Recheck state after obtaining the lock
77+
if (circuitBreakerState == Disarmed)
78+
{
79+
return;
80+
}
81+
82+
circuitBreakerState = Disarmed;
83+
84+
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
85+
Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed.", name);
86+
try
87+
{
88+
disarmedAction();
89+
}
90+
catch (Exception ex)
91+
{
92+
Logger.Error($"The circuit breaker for '{name}' was unable to execute the disarm action.", ex);
93+
throw;
94+
}
95+
}
96+
}
97+
98+
/// <summary>
99+
/// Log a failure, arming the circuit breaker if it was previously disarmed.
100+
/// </summary>
101+
/// <param name="exception">The exception that caused the failure.</param>
102+
/// <param name="cancellationToken">A cancellation token.</param>
103+
public Task Failure(Exception exception, CancellationToken cancellationToken = default)
104+
{
105+
// Atomically store the exception that caused the circuit breaker to trip
106+
_ = Interlocked.Exchange(ref lastException, exception);
107+
108+
var previousState = Volatile.Read(ref circuitBreakerState);
109+
if (previousState is Armed or Triggered)
110+
{
111+
return Delay();
112+
}
113+
114+
lock (stateLock)
115+
{
116+
// Recheck state after obtaining the lock
117+
previousState = circuitBreakerState;
118+
if (previousState is Armed or Triggered)
119+
{
120+
return Delay();
121+
}
122+
123+
circuitBreakerState = Armed;
124+
125+
try
126+
{
127+
// Executing the action first before starting the timer to ensure that the action is executed before the timer fires
128+
// and the time of the action is not included in the time to wait before triggering.
129+
armedAction();
130+
}
131+
catch (Exception ex)
132+
{
133+
Logger.Error($"The circuit breaker for '{name}' was unable to execute the arm action.", new AggregateException(ex, exception));
134+
throw;
135+
}
136+
137+
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
138+
Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state due to '{1}' and might trigger in '{2}' when not disarmed.", name, exception, timeToWaitBeforeTriggering);
139+
}
140+
141+
return Delay();
142+
143+
Task Delay()
144+
{
145+
var timeToWait = previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed;
146+
if (Logger.IsDebugEnabled)
147+
{
148+
Logger.DebugFormat("The circuit breaker for '{0}' is delaying the operation by '{1}'.", name, timeToWait);
149+
}
150+
return Task.Delay(timeToWait, cancellationToken);
151+
}
152+
}
153+
154+
/// <summary>
155+
/// Disposes the resources associated with the circuit breaker.
156+
/// </summary>
157+
public void Dispose() => timer.Dispose();
158+
159+
void CircuitBreakerTriggered(object? state)
160+
{
161+
var previousState = Volatile.Read(ref circuitBreakerState);
162+
if (previousState == Disarmed)
163+
{
164+
return;
165+
}
166+
167+
lock (stateLock)
168+
{
169+
// Recheck state after obtaining the lock
170+
if (circuitBreakerState == Disarmed)
171+
{
172+
return;
173+
}
174+
175+
circuitBreakerState = Triggered;
176+
Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered with exception '{1}'.", name, lastException);
177+
178+
try
179+
{
180+
triggerAction(lastException!);
181+
}
182+
catch (Exception ex)
183+
{
184+
Logger.Fatal($"The circuit breaker for '{name}' was unable to execute the trigger action.", new AggregateException(ex, lastException!));
185+
}
186+
}
187+
}
188+
189+
int circuitBreakerState = Disarmed;
190+
Exception? lastException;
191+
192+
readonly string name;
193+
readonly Timer timer;
194+
readonly TimeSpan timeToWaitBeforeTriggering;
195+
readonly Action<Exception> triggerAction;
196+
readonly Action armedAction;
197+
readonly Action disarmedAction;
198+
readonly TimeSpan timeToWaitWhenTriggered;
199+
readonly TimeSpan timeToWaitWhenArmed;
200+
readonly object stateLock = new();
201+
202+
const int Disarmed = 0;
203+
const int Armed = 1;
204+
const int Triggered = 2;
205+
206+
static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
207+
static readonly ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
208+
}

src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public ExternalIntegrationRequestsDataStore(RavenPersisterSettings settings, IRa
3333
"EventDispatcher",
3434
timeToWait,
3535
ex => criticalError.Raise("Repeated failures when dispatching external integration events.", ex),
36-
delayAfterFailure
36+
timeToWaitWhenArmed: delayAfterFailure
3737
);
3838
}
3939

@@ -98,7 +98,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken)
9898
catch (Exception ex)
9999
{
100100
Logger.Error("An exception occurred when dispatching external integration events", ex);
101-
await circuitBreaker.Failure(ex);
101+
await circuitBreaker.Failure(ex, cancellationToken);
102102

103103
if (!tokenSource.IsCancellationRequested)
104104
{
@@ -192,7 +192,6 @@ public async ValueTask DisposeAsync()
192192
}
193193

194194
tokenSource?.Dispose();
195-
circuitBreaker?.Dispose();
196195
}
197196

198197
readonly RavenPersisterSettings settings;

src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs

Lines changed: 0 additions & 76 deletions
This file was deleted.

0 commit comments

Comments
 (0)