Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .github/workflows/wf_build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/cache@v4
- name: Setup dotnet
uses: actions/setup-dotnet@v4
with:
path: |
~/.nuget/packages
~/.local/share/NuGet/v3-cache
key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }}
restore-keys: |
${{ runner.os }}-v0-nuget-
dotnet-version: |
6.0.x
8.0.x
- name: Build (Debug)
run: dotnet build ${{ github.workspace }}/Build.csproj
- name: Verify
Expand Down
96 changes: 50 additions & 46 deletions Tests/ConnectionRecoveryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ public void Reset()
public int CurrentAttempt => 1;
}

public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)
public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
private readonly ITestOutputHelper _testOutputHelper = testOutputHelper;

/// <summary>
/// The normal close the status should be correct and error null
/// The test records the status change when the connection is closed normally.
Expand All @@ -59,12 +57,13 @@ public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)
[InlineData(false)]
public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRecovery)
{
string containerId = Guid.NewGuid().ToString();
string localContainerId = $"{_containerId}_normal-close-connection-name";

IConnection connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().ContainerId(containerId).RecoveryConfiguration(
ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration(
RecoveryConfiguration.Create().Activated(activeRecovery).Topology(false)).Build());

TaskCompletionSource<bool> connectionClosedStateTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> connectionClosedStateTcs = CreateTaskCompletionSource<bool>();
var listFromStatus = new List<State>();
var listToStatus = new List<State>();
var listError = new List<Error?>();
Expand Down Expand Up @@ -105,14 +104,16 @@ public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRec
[Fact]
public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull()
{
const string containerId = "unexpected-close-connection-name";

string localContainerId = $"{_containerId}_unexpected-close-connection-name";

IConnection connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().ContainerId(containerId).RecoveryConfiguration(
ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration(
RecoveryConfiguration.Create().Activated(true).Topology(false)
.BackOffDelayPolicy(new FakeFastBackOffDelay())).Build());

TaskCompletionSource<bool> listErrorCountGreaterThanOrEqualToTwoTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> listErrorCountGreaterThanOrEqualToFourTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> listErrorCountGreaterThanOrEqualToTwoTcs = CreateTaskCompletionSource<bool>();
TaskCompletionSource<bool> listErrorCountGreaterThanOrEqualToFourTcs = CreateTaskCompletionSource<bool>();

var listFromStatus = new List<State>();
var listToStatus = new List<State>();
Expand All @@ -129,6 +130,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull()
// Note: must use try since it'll be called again
listErrorCountGreaterThanOrEqualToTwoTcs.TrySetResult(true);
}

if (listError.Count >= 4)
{
listErrorCountGreaterThanOrEqualToFourTcs.SetResult(true);
Expand All @@ -142,7 +144,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull()
};

Assert.Equal(State.Open, connection.State);
await SystemUtils.WaitUntilConnectionIsKilled(containerId);
await SystemUtils.WaitUntilConnectionIsKilled(localContainerId);
await listErrorCountGreaterThanOrEqualToTwoTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Equal(State.Open, listFromStatus[0]);
Expand Down Expand Up @@ -175,15 +177,16 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull()
[Fact]
public async Task OverrideTheBackOffWithBackOffDisabled()
{
string containerId = Guid.NewGuid().ToString();

string localContainerId = $"{_containerId}_override-backoff-disabled-connection-name";
IConnection connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().ContainerId(containerId).RecoveryConfiguration(
ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration(
RecoveryConfiguration.Create().Activated(true).Topology(false).BackOffDelayPolicy(
new FakeBackOffDelayPolicyDisabled())).Build());

var listFromStatus = new List<State>();
TaskCompletionSource<bool> listFromStatusCountGreaterOrEqualToTwo = CreateTaskCompletionSource();
TaskCompletionSource<bool> listErrorCountGreaterOrEqualToTwo = CreateTaskCompletionSource();
TaskCompletionSource<bool> listFromStatusCountGreaterOrEqualToTwo = CreateTaskCompletionSource<bool>();
TaskCompletionSource<bool> listErrorCountGreaterOrEqualToTwo = CreateTaskCompletionSource<bool>();

var listToStatus = new List<State>();
var listError = new List<Error>();
Expand All @@ -195,18 +198,20 @@ public async Task OverrideTheBackOffWithBackOffDisabled()
{
listError.Add(error);
}

if (listFromStatus.Count >= 2)
{
listFromStatusCountGreaterOrEqualToTwo.TrySetResult(true);
}

if (listError.Count >= 2)
{
listErrorCountGreaterOrEqualToTwo.SetResult(true);
}
};

Assert.Equal(State.Open, connection.State);
await SystemUtils.WaitUntilConnectionIsKilled(containerId);
await SystemUtils.WaitUntilConnectionIsKilled(localContainerId);

await listFromStatusCountGreaterOrEqualToTwo.Task.WaitAsync(TimeSpan.FromSeconds(5));

Expand Down Expand Up @@ -234,16 +239,16 @@ public async Task OverrideTheBackOffWithBackOffDisabled()
[Fact]
public async Task RecoveryTopologyShouldRecoverTheTempQueues()
{
string localContainerId = $"{_containerId}_temp-queue-should-recover-connection-name";
string queueName = $"temp-queue-should-recover-{true}";
const string containerId = "temp-queue-should-recover-connection-name";
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(true))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());
TaskCompletionSource<bool> twoRecoveryEventsSeenTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> twoRecoveryEventsSeenTcs = CreateTaskCompletionSource<bool>();
int recoveryEvents = 0;
connection.ChangeState += (sender, from, to, error) =>
{
Expand All @@ -257,7 +262,7 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues()
await management.Queue().Name(queueName).AutoDelete(true).Exclusive(true).DeclareAsync();
Assert.Equal(1, topologyListener.QueueCount());

await SystemUtils.WaitUntilConnectionIsKilled(containerId);
await SystemUtils.WaitUntilConnectionIsKilled(localContainerId);
await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10));
await SystemUtils.WaitUntilFuncAsync(() => recoveryEvents == 2);

Expand All @@ -281,15 +286,15 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues()
public async Task RecoveryTopologyShouldNotRecoverTheTempQueues()
{
string queueName = $"temp-queue-should-recover-{false}";
const string containerId = "temp-queue-should-not-recover-connection-name";
string localContainerId = $"{_containerId}_temp-queue-should-not-recover-connection-name";
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(false))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());
TaskCompletionSource<bool> oneRecoveryEventSeenTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> oneRecoveryEventSeenTcs = CreateTaskCompletionSource<bool>();
int recoveryEvents = 0;
connection.ChangeState += (sender, from, to, error) =>
{
Expand All @@ -303,7 +308,7 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues()
await management.Queue().Name(queueName).AutoDelete(true).Exclusive(true).DeclareAsync();
Assert.Equal(1, topologyListener.QueueCount());

await SystemUtils.WaitUntilConnectionIsKilled(containerId);
await SystemUtils.WaitUntilConnectionIsKilled(localContainerId);
await oneRecoveryEventSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10));

await SystemUtils.WaitUntilQueueDeletedAsync(queueName);
Expand All @@ -318,15 +323,15 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues()
public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled)
{
const string exchangeName = "exchange-should-recover";
const string containerId = nameof(RecoveryTopologyShouldRecoverExchanges);
string localContainerId = $"{_containerId}_exchange-should-recover-connection-name";
IConnection connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(topologyEnabled))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());
TaskCompletionSource<bool> twoRecoveryEventsSeenTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> twoRecoveryEventsSeenTcs = CreateTaskCompletionSource<bool>();
int recoveryEvents = 0;
connection.ChangeState += (sender, from, to, error) =>
{
Expand All @@ -346,7 +351,7 @@ public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled)
// the exchange is recovered.
await SystemUtils.DeleteExchangeAsync("exchange-should-recover");

await SystemUtils.WaitUntilConnectionIsKilled(containerId);
await SystemUtils.WaitUntilConnectionIsKilled(localContainerId);

await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10));

Expand Down Expand Up @@ -374,15 +379,15 @@ public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled)
[InlineData(false)]
public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled)
{
const string containerId = "binding-should-recover-connection-name";
string localContainerId = $"{_containerId}_binding-should-recover-connection-name";
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(topologyEnabled))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());
TaskCompletionSource<bool> twoRecoveryEventsSeenTcs = CreateTaskCompletionSource();
TaskCompletionSource<bool> twoRecoveryEventsSeenTcs = CreateTaskCompletionSource<bool>();
int recoveryEvents = 0;
connection.ChangeState += (sender, from, to, error) =>
{
Expand Down Expand Up @@ -410,7 +415,7 @@ public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled)
await SystemUtils.DeleteExchangeAsync("exchange-should-recover-binding");

// The queue will be deleted due of the auto-delete flag
await SystemUtils.WaitUntilConnectionIsKilled(containerId);
await SystemUtils.WaitUntilConnectionIsKilled(localContainerId);
await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10));

if (topologyEnabled)
Expand Down Expand Up @@ -453,13 +458,14 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("exchan
[Fact]
public async Task RemoveAQueueShouldRemoveTheBindings()
{
const string containerId = nameof(RemoveAQueueShouldRemoveTheBindings);
string localContainerId = $"{_containerId}_remove-queue-should-remove-binding-connection-name";

IConnection connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(true))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());

IManagement management = connection.Management();
Expand Down Expand Up @@ -513,13 +519,13 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("e-remo
[Fact]
public async Task RemoveAnExchangeShouldRemoveTheBindings()
{
const string containerId = "remove-exchange-should-remove-binding-connection-name";
string localContainerId = $"{_containerId}_remove-exchange-should-remove-binding-connection-name";
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(true))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());

IManagement management = connection.Management();
Expand Down Expand Up @@ -578,13 +584,13 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(
[Fact]
public async Task RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings()
{
const string containerId = nameof(RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings);
string localContainerId = $"{_containerId}_remove-exchange-bound-to-another-exchange-should-remove-binding-connection-name";
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create()
.RecoveryConfiguration(RecoveryConfiguration.Create()
.BackOffDelayPolicy(new FakeFastBackOffDelay())
.Topology(true))
.ContainerId(containerId)
.ContainerId(localContainerId)
.Build());

IManagement management = connection.Management();
Expand All @@ -595,7 +601,8 @@ public async Task RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings(

await exSpec.DeclareAsync();

var exSpecDestination = management.Exchange().Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination")
var exSpecDestination = management.Exchange()
.Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination")
.Type(ExchangeType.DIRECT);

await exSpecDestination.DeclareAsync();
Expand All @@ -606,7 +613,8 @@ await management.Binding().SourceExchange(exSpec)
.DestinationExchange(exSpecDestination).Key($"key_{i}").BindAsync();
}

await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync("e-remove-exchange-bound-to-another-exchange-should-remove-binding",
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(
"e-remove-exchange-bound-to-another-exchange-should-remove-binding",
"e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination");

Assert.Equal(10, topologyListener.BindingCount());
Expand All @@ -616,16 +624,12 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync("e-remov

await exSpec.DeleteAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync("e-remove-exchange-bound-to-another-exchange-should-remove-binding",
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(
"e-remove-exchange-bound-to-another-exchange-should-remove-binding",
"e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination");

Assert.Equal(0, topologyListener.ExchangeCount());

await connection.CloseAsync();
}

private static TaskCompletionSource<bool> CreateTaskCompletionSource()
{
return new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
4 changes: 3 additions & 1 deletion Tests/IntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public IntegrationTest(ITestOutputHelper testOutputHelper,
{
_testOutputHelper = testOutputHelper;
_setupConnectionAndManagement = setupConnectionAndManagement;

_queueName = $"{_testDisplayName}-queue-{Now}";
_exchangeName = $"{_testDisplayName}-exchange-{Now}";

_testDisplayName = InitTestDisplayName();
_containerId = $"{_testDisplayName}:{Now}";

testOutputHelper.WriteLine($"Running test: {_testDisplayName}");
_connectionSettingBuilder = InitConnectionSettingsBuilder();
}

Expand All @@ -63,6 +63,7 @@ public virtual async Task InitializeAsync()

public virtual async Task DisposeAsync()
{
_testOutputHelper.WriteLine($"Disposing test: {_testDisplayName}");
if (_management is not null && _management.State == State.Open)
{
try
Expand All @@ -72,6 +73,7 @@ public virtual async Task DisposeAsync()
}
catch
{

}

try
Expand Down
4 changes: 3 additions & 1 deletion Tests/SystemUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public static Task WaitUntilConnectionIsClosed(string containerId)
public static async Task WaitUntilConnectionIsKilled(string containerId)
{
await WaitUntilConnectionIsOpen(containerId);
await WaitUntilAsync(async () => await s_httpApiClient.KillConnectionAsync(containerId) == 1);
await WaitUntilAsync(async () =>

await s_httpApiClient.KillConnectionAsync(containerId) == 1);
}

public static async Task WaitUntilConnectionIsKilledAndOpen(string containerId)
Expand Down