diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs
index 613a840a9..b8fa11f24 100644
--- a/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs
+++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs
@@ -200,7 +200,7 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
// If either the receiving or processing circuit breakers are triggered, start only one message processing task at a time.
var maximumConcurrentProcessing =
- messageProcessingCircuitBreaker.Triggered || messageReceivingCircuitBreaker.Triggered
+ messageProcessingCircuitBreaker.IsTriggered || messageReceivingCircuitBreaker.IsTriggered
? 1
: messageCount;
diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
index 9412c7192..5655fb0c4 100644
--- a/src/NServiceBus.Transport.Sql.Shared/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
+++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
@@ -1,101 +1,210 @@
-namespace NServiceBus.Transport.Sql.Shared;
+#nullable enable
+
+namespace NServiceBus.Transport.Sql.Shared;
using System;
using System.Threading;
using System.Threading.Tasks;
-using NServiceBus.Logging;
-
-class RepeatedFailuresOverTimeCircuitBreaker : IDisposable
+using Logging;
+
+///
+/// A circuit breaker that is armed on a failure and disarmed on success. After in the
+/// armed state, the will fire. The and allow
+/// changing other state when the circuit breaker is armed or disarmed.
+///
+sealed class RepeatedFailuresOverTimeCircuitBreaker
{
- public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction)
+ ///
+ /// A circuit breaker that is armed on a failure and disarmed on success. After in the
+ /// armed state, the will fire. The and allow
+ /// changing other state when the circuit breaker is armed or disarmed.
+ ///
+ /// A name that is output in log messages when the circuit breaker changes states.
+ /// The time to wait after the first failure before triggering.
+ /// The action to take when the circuit breaker is triggered.
+ /// The action to execute on the first failure.
+ /// Warning: This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
+ /// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
+ ///
+ /// The action to execute when a success disarms the circuit breaker.
+ /// Warning: This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
+ /// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
+ ///
+ /// How long to delay on each failure when in the Triggered state. Defaults to 10 seconds.
+ /// How long to delay on each failure when in the Armed state. Defaults to 1 second.
+ ///
+ /// The and are invoked from within a lock to ensure that arming and disarming
+ /// actions are serialized and do not execute concurrently. As a result, care must be taken to ensure that these actions do not
+ /// introduce delays or deadlocks by performing lengthy operations or synchronously waiting on external resources.
+ ///
+ /// Best practice: If the logic inside these actions involves blocking or long-running tasks, consider offloading
+ /// the work to a background task or thread that doesn't hold the lock.
+ ///
+ public RepeatedFailuresOverTimeCircuitBreaker(
+ string name,
+ TimeSpan timeToWaitBeforeTriggering,
+ Action triggerAction,
+ Action? armedAction = null,
+ Action? disarmedAction = null,
+ TimeSpan? timeToWaitWhenTriggered = default,
+ TimeSpan? timeToWaitWhenArmed = default)
{
this.name = name;
this.triggerAction = triggerAction;
+ this.armedAction = armedAction ?? (static () => { });
+ this.disarmedAction = disarmedAction ?? (static () => { });
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
+ this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10);
+ this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1);
timer = new Timer(CircuitBreakerTriggered);
}
- public bool Triggered => triggered;
-
+ ///
+ /// Log a success, disarming the circuit breaker if it was previously armed.
+ ///
public void Success()
{
- var oldValue = Interlocked.Exchange(ref failureCount, 0);
-
- if (oldValue == 0)
+ // Check the status of the circuit breaker, exiting early outside the lock if already disarmed
+ if (Volatile.Read(ref circuitBreakerState) == Disarmed)
{
return;
}
- timer.Change(Timeout.Infinite, Timeout.Infinite);
- triggered = false;
- Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
+ lock (stateLock)
+ {
+ // Recheck state after obtaining the lock
+ if (circuitBreakerState == Disarmed)
+ {
+ return;
+ }
+
+ circuitBreakerState = Disarmed;
+
+ _ = timer.Change(Timeout.Infinite, Timeout.Infinite);
+ Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed.", name);
+ try
+ {
+ disarmedAction();
+ }
+ catch (Exception ex)
+ {
+ Logger.Error($"The circuit breaker for '{name}' was unable to execute the disarm action.", ex);
+ throw;
+ }
+ }
}
+ ///
+ /// Log a failure, arming the circuit breaker if it was previously disarmed.
+ ///
+ /// The exception that caused the failure.
+ /// A cancellation token.
public Task Failure(Exception exception, CancellationToken cancellationToken = default)
{
- lastException = exception;
- var newValue = Interlocked.Increment(ref failureCount);
+ // Atomically store the exception that caused the circuit breaker to trip
+ _ = Interlocked.Exchange(ref lastException, exception);
- if (newValue == 1)
+ var previousState = Volatile.Read(ref circuitBreakerState);
+ if (previousState is Armed or Triggered)
{
- timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
- Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
+ return Delay();
}
- var delay = Triggered ? ThrottledDelay : NonThrottledDelay;
- return Task.Delay(delay, cancellationToken);
- }
-
- void CircuitBreakerTriggered(object state)
- {
- if (Interlocked.Read(ref failureCount) > 0)
+ lock (stateLock)
{
- triggered = true;
- Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
+ // Recheck state after obtaining the lock
+ previousState = circuitBreakerState;
+ if (previousState is Armed or Triggered)
+ {
+ return Delay();
+ }
+
+ circuitBreakerState = Armed;
try
{
- triggerAction(lastException);
+ // Executing the action first before starting the timer to ensure that the action is executed before the timer fires
+ // and the time of the action is not included in the time to wait before triggering.
+ armedAction();
}
catch (Exception ex)
{
- Logger.Error($"Error invoking trigger action for circuit breaker {name}", ex);
+ Logger.Error($"The circuit breaker for '{name}' was unable to execute the arm action.", new AggregateException(ex, exception));
+ throw;
+ }
+
+ _ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
+ Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state due to '{1}' and might trigger in '{2}' when not disarmed.", name, exception, timeToWaitBeforeTriggering);
+ }
+
+ return Delay();
+
+ Task Delay()
+ {
+ var timeToWait = previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed;
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat("The circuit breaker for '{0}' is delaying the operation by '{1}'.", name, timeToWait);
}
+ return Task.Delay(timeToWait, cancellationToken);
}
}
- protected virtual void Dispose(bool disposing)
+ ///
+ /// Disposes the resources associated with the circuit breaker.
+ ///
+ public void Dispose() => timer.Dispose();
+
+ void CircuitBreakerTriggered(object? state)
{
- if (!disposed)
+ var previousState = Volatile.Read(ref circuitBreakerState);
+ if (previousState == Disarmed)
+ {
+ return;
+ }
+
+ lock (stateLock)
{
- if (disposing)
+ // Recheck state after obtaining the lock
+ if (circuitBreakerState == Disarmed)
{
- timer?.Dispose();
+ return;
}
- disposed = true;
+ circuitBreakerState = Triggered;
+ Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered with exception '{1}'.", name, lastException);
+
+ try
+ {
+ triggerAction(lastException!);
+ }
+ catch (Exception ex)
+ {
+ Logger.Fatal($"The circuit breaker for '{name}' was unable to execute the trigger action.", new AggregateException(ex, lastException!));
+ }
}
}
- public void Dispose()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
- }
+ public bool IsTriggered => circuitBreakerState == Triggered;
+
+ int circuitBreakerState = Disarmed;
+ Exception? lastException;
+
+ readonly string name;
+ readonly Timer timer;
+ readonly TimeSpan timeToWaitBeforeTriggering;
+ readonly Action triggerAction;
+ readonly Action armedAction;
+ readonly Action disarmedAction;
+ readonly TimeSpan timeToWaitWhenTriggered;
+ readonly TimeSpan timeToWaitWhenArmed;
+ readonly object stateLock = new();
+
+ const int Disarmed = 0;
+ const int Armed = 1;
+ const int Triggered = 2;
- string name;
- TimeSpan timeToWaitBeforeTriggering;
- Timer timer;
- Action triggerAction;
- long failureCount;
- Exception lastException;
- volatile bool triggered;
- bool disposed;
-
- static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
- static ILog Logger = LogManager.GetLogger();
- static TimeSpan NonThrottledDelay = TimeSpan.FromSeconds(1);
- static TimeSpan ThrottledDelay = TimeSpan.FromSeconds(10);
+ static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
+ static readonly ILog Logger = LogManager.GetLogger();
}
\ No newline at end of file
diff --git a/src/NServiceBus.Transport.SqlServer.UnitTests/.editorconfig b/src/NServiceBus.Transport.SqlServer.UnitTests/.editorconfig
index 6ba33f0ef..0f61b0e11 100644
--- a/src/NServiceBus.Transport.SqlServer.UnitTests/.editorconfig
+++ b/src/NServiceBus.Transport.SqlServer.UnitTests/.editorconfig
@@ -1,4 +1,7 @@
[*.cs]
+# Justification: Test project
+dotnet_diagnostic.CA2007.severity = none
+
# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion
diff --git a/src/NServiceBus.Transport.SqlServer.UnitTests/Receiving/RepeatedFailuresOverTimeCircuitBreakerTests.cs b/src/NServiceBus.Transport.SqlServer.UnitTests/Receiving/RepeatedFailuresOverTimeCircuitBreakerTests.cs
new file mode 100644
index 000000000..f494c2ccc
--- /dev/null
+++ b/src/NServiceBus.Transport.SqlServer.UnitTests/Receiving/RepeatedFailuresOverTimeCircuitBreakerTests.cs
@@ -0,0 +1,223 @@
+namespace NServiceBus.Transport.SqlServer.UnitTests.Receiving
+{
+ using System;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using NUnit.Framework;
+ using NServiceBus.Transport.Sql.Shared;
+
+ // Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
+ // and we want keep the changes to a minimum for now to allow backporting to older versions.
+ [TestFixture]
+ public class RepeatedFailuresOverTimeCircuitBreakerTests
+ {
+ [Test]
+ public async Task Should_disarm_on_success()
+ {
+ var armedActionCalled = false;
+ var disarmedActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => { },
+ () => armedActionCalled = true,
+ () => disarmedActionCalled = true,
+ TimeSpan.Zero,
+ TimeSpan.Zero
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ circuitBreaker.Success();
+
+ Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
+ Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
+ }
+
+ [Test]
+ public async Task Should_rethrow_exception_on_success()
+ {
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => { },
+ () => { },
+ () => throw new Exception("Exception from disarmed action"),
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.Zero
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+
+ var ex = Assert.Throws(() => circuitBreaker.Success());
+ Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
+ }
+
+ [Test]
+ public async Task Should_trigger_after_failure_timeout()
+ {
+ var triggerActionCalled = false;
+ Exception lastTriggerException = null;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.Zero,
+ ex => { triggerActionCalled = true; lastTriggerException = ex; },
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+
+ Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
+ Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
+ }
+
+ [Test]
+ public void Should_rethrow_exception_on_failure()
+ {
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => { },
+ () => throw new Exception("Exception from armed action"),
+ () => { },
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.Zero
+ );
+
+ var ex = Assert.ThrowsAsync(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
+ Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
+ }
+
+ [Test]
+ public async Task Should_delay_after_trigger_failure()
+ {
+ var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
+ var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.Zero,
+ _ => { },
+ timeToWaitWhenTriggered: timeToWaitWhenTriggered,
+ timeToWaitWhenArmed: timeToWaitWhenArmed
+ );
+
+ var stopWatch = Stopwatch.StartNew();
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));
+
+ stopWatch.Stop();
+
+ Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
+ }
+
+ [Test]
+ public async Task Should_not_trigger_if_disarmed_before_timeout()
+ {
+ var triggerActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => triggerActionCalled = true,
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.Zero
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ circuitBreaker.Success();
+
+ Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
+ }
+
+ [Test]
+ public async Task Should_handle_concurrent_failure_and_success()
+ {
+ var armedActionCalled = false;
+ var disarmedActionCalled = false;
+ var triggerActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => triggerActionCalled = true,
+ () => armedActionCalled = true,
+ () => disarmedActionCalled = true,
+ TimeSpan.Zero,
+ TimeSpan.Zero
+ );
+
+ var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
+ var successTask = Task.Run(() =>
+ {
+ Thread.Sleep(50); // Simulate some delay before success
+ circuitBreaker.Success();
+ });
+
+ await Task.WhenAll(failureTask, successTask);
+
+ Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
+ Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
+ Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
+ }
+
+ [Test]
+ public async Task Should_handle_high_concurrent_failure_and_success()
+ {
+ var armedActionCalled = 0;
+ var disarmedActionCalled = 0;
+ var triggerActionCalled = 0;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromSeconds(5),
+ ex => Interlocked.Increment(ref triggerActionCalled),
+ () => Interlocked.Increment(ref armedActionCalled),
+ () => Interlocked.Increment(ref disarmedActionCalled),
+ TimeSpan.Zero,
+ TimeSpan.FromMilliseconds(25)
+ );
+
+ var tasks = Enumerable.Range(0, 1000)
+ .Select(
+ i => i % 2 == 0 ?
+ circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
+ Task.Run(() =>
+ {
+ Thread.Sleep(25); // Simulate some delay before success
+ circuitBreaker.Success();
+ })
+ ).ToArray();
+
+ await Task.WhenAll(tasks);
+
+ Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
+ Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
+ Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
+ }
+
+ [Test]
+ public async Task Should_trigger_after_multiple_failures_and_timeout()
+ {
+ var triggerActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(50),
+ ex => triggerActionCalled = true,
+ timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
+ timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));
+
+ Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
+ }
+ }
+}
\ No newline at end of file