Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
include:
- os: windows-2019
name: Windows
- os: ubuntu-20.04
- os: ubuntu-22.04
name: Linux
fail-fast: false
steps:
Expand Down
1 change: 0 additions & 1 deletion src/NServiceBus.SqlServer/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<Weavers GenerateXsd="false">
<Janitor />
<Obsolete />
</Weavers>
1 change: 0 additions & 1 deletion src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

<ItemGroup>
<PackageReference Include="Fody" Version="6.9.1" PrivateAssets="All" />
<PackageReference Include="Janitor.Fody" Version="1.9.0" PrivateAssets="All" />
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="4.2.0" PrivateAssets="All" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
namespace NServiceBus.Transport.SqlServer.UnitTests.Receiving
{
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using NServiceBus.Transport.SqlServer;

#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task

// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
// and we want keep the changes to a minimum for now to allow backporting to older versions.
[TestFixture]
public class RepeatedFailuresOverTimeCircuitBreakerTests
{
[Test]
public async Task Should_disarm_on_success()
{
var armedActionCalled = false;
var disarmedActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => { },
() => armedActionCalled = true,
() => disarmedActionCalled = true,
TimeSpan.Zero,
TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));
circuitBreaker.Success();

Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
}

[Test]
public async Task Should_rethrow_exception_on_success()
{
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
_ => { },
() => { },
() => throw new Exception("Exception from disarmed action"),
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));

var ex = Assert.Throws<Exception>(() => circuitBreaker.Success());
Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
}

[Test]
public async Task Should_trigger_after_failure_timeout()
{
var triggerActionCalled = false;
Exception lastTriggerException = null;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.Zero,
ex => { triggerActionCalled = true; lastTriggerException = ex; },
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
);

await circuitBreaker.Failure(new Exception("Test Exception"));

Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
}

[Test]
public void Should_rethrow_exception_on_failure()
{
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
_ => { },
() => throw new Exception("Exception from armed action"),
() => { },
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

var ex = Assert.ThrowsAsync<Exception>(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
}

[Test]
public async Task Should_delay_after_trigger_failure()
{
var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.Zero,
_ => { },
timeToWaitWhenTriggered: timeToWaitWhenTriggered,
timeToWaitWhenArmed: timeToWaitWhenArmed
);

var stopWatch = Stopwatch.StartNew();

await circuitBreaker.Failure(new Exception("Test Exception"));
await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));

stopWatch.Stop();

Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
}

[Test]
public async Task Should_not_trigger_if_disarmed_before_timeout()
{
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => triggerActionCalled = true,
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));
circuitBreaker.Success();

Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
}

[Test]
public async Task Should_handle_concurrent_failure_and_success()
{
var armedActionCalled = false;
var disarmedActionCalled = false;
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => triggerActionCalled = true,
() => armedActionCalled = true,
() => disarmedActionCalled = true,
TimeSpan.Zero,
TimeSpan.Zero
);

var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
var successTask = Task.Run(() =>
{
Thread.Sleep(50); // Simulate some delay before success
circuitBreaker.Success();
});

await Task.WhenAll(failureTask, successTask);

Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
}

[Test]
public async Task Should_handle_high_concurrent_failure_and_success()
{
var armedActionCalled = 0;
var disarmedActionCalled = 0;
var triggerActionCalled = 0;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromSeconds(5),
ex => Interlocked.Increment(ref triggerActionCalled),
() => Interlocked.Increment(ref armedActionCalled),
() => Interlocked.Increment(ref disarmedActionCalled),
TimeSpan.Zero,
TimeSpan.FromMilliseconds(25)
);

var tasks = Enumerable.Range(0, 1000)
.Select(
i => i % 2 == 0 ?
circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
Task.Run(() =>
{
Thread.Sleep(25); // Simulate some delay before success
circuitBreaker.Success();
})
).ToArray();

await Task.WhenAll(tasks);

Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
}

[Test]
public async Task Should_trigger_after_multiple_failures_and_timeout()
{
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(50),
ex => triggerActionCalled = true,
timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
);

await circuitBreaker.Failure(new Exception("Test Exception"));
await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));

Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
}
}
}
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
1 change: 0 additions & 1 deletion src/NServiceBus.Transport.SqlServer/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<Weavers GenerateXsd="false">
<Janitor />
<Obsolete />
</Weavers>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

<ItemGroup>
<PackageReference Include="Fody" Version="6.9.1" PrivateAssets="All" />
<PackageReference Include="Janitor.Fody" Version="1.9.0" PrivateAssets="All" />
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="4.2.0" PrivateAssets="All" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
var stopBatchCancellationSource = new CancellationTokenSource();

// If either the receiving or processing circuit breakers are triggered, start only one message processing task at a time.
var maximumConcurrentProcessing = messageProcessingCircuitBreaker.Triggered || messageReceivingCircuitBreaker.Triggered ? 1 : messageCount;
var maximumConcurrentProcessing =
messageProcessingCircuitBreaker.IsTriggered || messageReceivingCircuitBreaker.IsTriggered
? 1
: messageCount;

for (var i = 0; i < maximumConcurrentProcessing; i++)
{
Expand Down
Loading
Loading