diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index 548d067a..76c588f5 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -35,9 +35,16 @@ jobs: run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Test timeout-minutes: 25 + # --diag ${{ github.workspace}}\diag run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' - name: Check for errors in RabbitMQ logs run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 + # - name: Upload dotnet test diag logs (on failure) + # if: failure() + # uses: actions/upload-artifact@v4 + # with: + # name: dotnet-test-diag-win32 + # path: ${{ github.workspace }}/diag/ - name: Maybe upload RabbitMQ logs if: failure() uses: actions/upload-artifact@v4 @@ -57,12 +64,19 @@ jobs: run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh - name: Test timeout-minutes: 25 + # --diag ${{ github.workspace}}/diag run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" - name: Check for errors in RabbitMQ logs run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check - name: Collect RabbitMQ logs (on failure) if: failure() run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh + # - name: Upload dotnet test diag logs (on failure) + # if: failure() + # uses: actions/upload-artifact@v4 + # with: + # name: dotnet-test-diag-ubuntu + # path: ${{ github.workspace }}/diag/ - name: Upload RabbitMQ logs (on failure) if: failure() uses: actions/upload-artifact@v4 diff --git a/.gitignore b/.gitignore index aad7dc79..1a4833da 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ InternalTrace* *.lock.json nunit-agent* *.pyc +test-diag.log test-output.log TestResults.xml TestResult.xml @@ -24,6 +25,7 @@ gensrc/ .ionide/ NuGet/ tmp/ +diag/ .vscode/ ################# diff --git a/RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs b/RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs index 2eebb0f8..d3ea59d3 100644 --- a/RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs +++ b/RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs @@ -47,7 +47,7 @@ protected void ThrowIfClosed() case State.Closing: throw new AmqpNotOpenException("Resource is closing"); case State.Reconnecting: - throw new AmqpNotOpenException("Resource is Reconnecting"); + throw new AmqpNotOpenException("Resource is reconnecting"); case State.Open: break; default: diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs index f784059d..4da109bf 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs @@ -59,7 +59,7 @@ public override async Task OpenAsync() Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id, _configuration.Filters); - void onAttached(ILink argLink, Attach argAttach) + void OnAttached(ILink argLink, Attach argAttach) { if (argLink is ReceiverLink link) { @@ -69,30 +69,25 @@ void onAttached(ILink argLink, Attach argAttach) { // TODO create "internal bug" exception type? var ex = new InvalidOperationException( - "invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); attachCompletedTcs.SetException(ex); } } - ReceiverLink? tmpReceiverLink = null; - Task receiverLinkTask = Task.Run(async () => - { - Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync() - .ConfigureAwait(false); - tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached); - }); + Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync() + .ConfigureAwait(false); - // TODO configurable timeout - TimeSpan waitSpan = TimeSpan.FromSeconds(5); + var tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, OnAttached); + // TODO configurable timeout + var waitSpan = TimeSpan.FromSeconds(5); _receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan) .ConfigureAwait(false); - await receiverLinkTask.WaitAsync(waitSpan) - .ConfigureAwait(false); - - System.Diagnostics.Debug.Assert(tmpReceiverLink != null); - System.Diagnostics.Debug.Assert(object.ReferenceEquals(_receiverLink, tmpReceiverLink)); + if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink)) + { + // TODO log this case? + } if (_receiverLink is null) { diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs b/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs index 4c694447..d05af221 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs @@ -46,14 +46,14 @@ public async Task CreateConnectionAsync(IConnectionSettings connect return c; } - public async Task CreateConnectionAsync() + public Task CreateConnectionAsync() { - if (ConnectionSettings != null) + if (ConnectionSettings is null) { - return await CreateConnectionAsync(ConnectionSettings).ConfigureAwait(false); + throw new ConnectionException("Connection settings are not set"); } - throw new ConnectionException("Connection settings are not set"); + return CreateConnectionAsync(ConnectionSettings); } public ReadOnlyCollection GetConnections() => diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs index e68c9b27..dc727e36 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Concurrent; -using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -270,24 +269,38 @@ private async Task EnsureReceiverLinkAsync() new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), }, }; - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _receiverLink = new ReceiverLink( + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tmpReceiverLink = new ReceiverLink( _managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) => { - Debug.Assert(Object.ReferenceEquals(_receiverLink, link)); - tcs.SetResult(true); + if (link is ReceiverLink receiverLink) + { + tcs.SetResult(receiverLink); + } + else + { + // TODO create "internal bug" exception type? + var ex = new InvalidOperationException( + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); + tcs.SetException(ex); + } }); - await tcs.Task + _receiverLink = await tcs.Task .ConfigureAwait(false); + if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink)) + { + // TODO log this case? + } + // TODO // using a credit of 1 can result in AmqpExceptions in ProcessResponses _receiverLink.SetCredit(100); } } - private Task EnsureSenderLinkAsync() + private async Task EnsureSenderLinkAsync() { if (_senderLink == null || _senderLink.IsClosed) { @@ -315,18 +328,30 @@ private Task EnsureSenderLinkAsync() }, }; - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _senderLink = new SenderLink( + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tmpSenderLink = new SenderLink( _managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) => { - Debug.Assert(Object.ReferenceEquals(_senderLink, link)); - tcs.SetResult(true); + if (link is SenderLink senderLink) + { + tcs.SetResult(senderLink); + } + else + { + // TODO create "internal bug" exception type? + var ex = new InvalidOperationException( + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); + tcs.SetException(ex); + } }); - return tcs.Task; - } - else - { - return Task.CompletedTask; + + _senderLink = await tcs.Task + .ConfigureAwait(false); + + if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink)) + { + // TODO log this case? + } } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs index f815f743..8a8cfdfa 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs @@ -38,7 +38,7 @@ public override async Task OpenAsync() Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id); - void onAttached(ILink argLink, Attach argAttach) + void OnAttached(ILink argLink, Attach argAttach) { if (argLink is SenderLink link) { @@ -53,25 +53,19 @@ void onAttached(ILink argLink, Attach argAttach) } } - SenderLink? tmpSenderLink = null; - Task senderLinkTask = Task.Run(async () => - { - Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync() - .ConfigureAwait(false); - tmpSenderLink = new SenderLink(session, _id.ToString(), attach, onAttached); - }); + Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync() + .ConfigureAwait(false); + var tmpSenderLink = new SenderLink(session, _id.ToString(), attach, OnAttached); // TODO configurable timeout - TimeSpan waitSpan = TimeSpan.FromSeconds(5); - + var waitSpan = TimeSpan.FromSeconds(5); _senderLink = await attachCompletedTcs.Task.WaitAsync(waitSpan) .ConfigureAwait(false); - await senderLinkTask.WaitAsync(waitSpan) - .ConfigureAwait(false); - - System.Diagnostics.Debug.Assert(tmpSenderLink != null); - System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_senderLink, tmpSenderLink)); + if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink)) + { + // TODO log this case? + } if (_senderLink is null) { @@ -125,10 +119,13 @@ public async Task PublishAsync(IMessage message, CancellationToke void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state) { - System.Diagnostics.Debug.Assert(object.ReferenceEquals(this, state)); - System.Diagnostics.Debug.Assert(object.ReferenceEquals(_senderLink, sender)); // Note: sometimes `message` is null 🤔 - // System.Diagnostics.Debug.Assert(Object.ReferenceEquals(nativeMessage, message)); + System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state)); + + if (false == Object.ReferenceEquals(_senderLink, sender)) + { + // TODO log this case? + } PublishOutcome publishOutcome; switch (outcome) diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs b/RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs index b245b883..78c1a393 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs @@ -41,8 +41,8 @@ void OnBegin(ISession session, Begin peerBegin) } rv = new Session(_amqpConnection.NativeConnection, GetDefaultBegin(), OnBegin); + // TODO cancellation token ISession awaitedSession = await sessionBeginTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); - System.Diagnostics.Debug.Assert(Object.ReferenceEquals(rv, awaitedSession)); _sessions.Add(rv); } diff --git a/Tests/ConnectionRecoveryTests.cs b/Tests/ConnectionRecoveryTests.cs index 984353eb..be216e76 100644 --- a/Tests/ConnectionRecoveryTests.cs +++ b/Tests/ConnectionRecoveryTests.cs @@ -8,12 +8,11 @@ using System.Threading.Tasks; using RabbitMQ.AMQP.Client; using RabbitMQ.AMQP.Client.Impl; +using Xunit; using Xunit.Abstractions; namespace Tests; -using Xunit; - internal class FakeBackOffDelayPolicyDisabled : IBackOffDelayPolicy { public int Delay() @@ -44,7 +43,8 @@ public void Reset() public int CurrentAttempt => 1; } -public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) +public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) + : IntegrationTest(testOutputHelper, setupConnectionAndManagement: false) { /// /// The normal close the status should be correct and error null @@ -57,10 +57,11 @@ public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) : Integ [InlineData(false)] public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRecovery) { - string localContainerId = $"{_containerId}_normal-close-connection-name"; + Assert.Null(_connection); + Assert.Null(_management); IConnection connection = await AmqpConnection.CreateAsync( - ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration( + ConnectionSettingBuilder.Create().ContainerId(_containerId).RecoveryConfiguration( RecoveryConfiguration.Create().Activated(activeRecovery).Topology(false)).Build()); TaskCompletionSource connectionClosedStateTcs = CreateTaskCompletionSource(); @@ -104,11 +105,11 @@ public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRec [Fact] public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() { - - string localContainerId = $"{_containerId}_unexpected-close-connection-name"; + Assert.Null(_connection); + Assert.Null(_management); IConnection connection = await AmqpConnection.CreateAsync( - ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration( + ConnectionSettingBuilder.Create().ContainerId(_containerId).RecoveryConfiguration( RecoveryConfiguration.Create().Activated(true).Topology(false) .BackOffDelayPolicy(new FakeFastBackOffDelay())).Build()); @@ -144,7 +145,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() }; Assert.Equal(State.Open, connection.State); - await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); + await SystemUtils.WaitUntilConnectionIsKilled(_containerId); await listErrorCountGreaterThanOrEqualToTwoTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal(State.Open, listFromStatus[0]); @@ -177,10 +178,11 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() [Fact] public async Task OverrideTheBackOffWithBackOffDisabled() { + Assert.Null(_connection); + Assert.Null(_management); - string localContainerId = $"{_containerId}_override-backoff-disabled-connection-name"; IConnection connection = await AmqpConnection.CreateAsync( - ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration( + ConnectionSettingBuilder.Create().ContainerId(_containerId).RecoveryConfiguration( RecoveryConfiguration.Create().Activated(true).Topology(false).BackOffDelayPolicy( new FakeBackOffDelayPolicyDisabled())).Build()); @@ -211,7 +213,7 @@ public async Task OverrideTheBackOffWithBackOffDisabled() }; Assert.Equal(State.Open, connection.State); - await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); + await SystemUtils.WaitUntilConnectionIsKilled(_containerId); await listFromStatusCountGreaterOrEqualToTwo.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -239,14 +241,16 @@ public async Task OverrideTheBackOffWithBackOffDisabled() [Fact] public async Task RecoveryTopologyShouldRecoverTheTempQueues() { - string localContainerId = $"{_containerId}_temp-queue-should-recover-connection-name"; + Assert.Null(_connection); + Assert.Null(_management); + string queueName = $"temp-queue-should-recover-{true}"; - var connection = await AmqpConnection.CreateAsync( + IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; @@ -262,7 +266,7 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues() await management.Queue().Name(queueName).AutoDelete(true).Exclusive(true).DeclareAsync(); Assert.Equal(1, topologyListener.QueueCount()); - await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); + await SystemUtils.WaitUntilConnectionIsKilled(_containerId); await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); await SystemUtils.WaitUntilFuncAsync(() => recoveryEvents == 2); @@ -285,14 +289,16 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues() [Fact] public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() { + Assert.Null(_connection); + Assert.Null(_management); + string queueName = $"temp-queue-should-recover-{false}"; - string localContainerId = $"{_containerId}_temp-queue-should-not-recover-connection-name"; - var connection = await AmqpConnection.CreateAsync( + IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(false)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); TaskCompletionSource oneRecoveryEventSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; @@ -308,7 +314,7 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() await management.Queue().Name(queueName).AutoDelete(true).Exclusive(true).DeclareAsync(); Assert.Equal(1, topologyListener.QueueCount()); - await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); + await SystemUtils.WaitUntilConnectionIsKilled(_containerId); await oneRecoveryEventSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); await SystemUtils.WaitUntilQueueDeletedAsync(queueName); @@ -322,14 +328,16 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() [InlineData(false)] public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled) { + Assert.Null(_connection); + Assert.Null(_management); + const string exchangeName = "exchange-should-recover"; - string localContainerId = $"{_containerId}_exchange-should-recover-connection-name"; IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(topologyEnabled)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; @@ -351,7 +359,7 @@ public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled) // the exchange is recovered. await SystemUtils.DeleteExchangeAsync("exchange-should-recover"); - await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); + await SystemUtils.WaitUntilConnectionIsKilled(_containerId); await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); @@ -379,13 +387,15 @@ public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled) [InlineData(false)] public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled) { - string localContainerId = $"{_containerId}_binding-should-recover-connection-name"; - var connection = await AmqpConnection.CreateAsync( + Assert.Null(_connection); + Assert.Null(_management); + + IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(topologyEnabled)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; @@ -398,14 +408,14 @@ public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled) }; IManagement management = connection.Management(); ITopologyListener topologyListener = ((IManagementTopology)management).TopologyListener(); - var exSpec = management.Exchange().Name("exchange-should-recover-binding").AutoDelete(true) + IExchangeSpecification exSpec = management.Exchange().Name("exchange-should-recover-binding").AutoDelete(true) .Type(ExchangeType.DIRECT); await exSpec.DeclareAsync(); Assert.Equal(1, topologyListener.ExchangeCount()); - var queueSpec = management.Queue().Name("queue-should-recover-binding").AutoDelete(true).Exclusive(true); + IQueueSpecification queueSpec = management.Queue().Name("queue-should-recover-binding").AutoDelete(true).Exclusive(true); await queueSpec.DeclareAsync(); Assert.Equal(1, topologyListener.QueueCount()); - var bindingSpec = + IBindingSpecification bindingSpec = management.Binding().SourceExchange(exSpec).DestinationQueue(queueSpec).Key("key"); await bindingSpec.BindAsync(); Assert.Equal(1, topologyListener.BindingCount()); @@ -415,7 +425,7 @@ public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled) await SystemUtils.DeleteExchangeAsync("exchange-should-recover-binding"); // The queue will be deleted due of the auto-delete flag - await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); + await SystemUtils.WaitUntilConnectionIsKilled(_containerId); await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); if (topologyEnabled) @@ -458,28 +468,29 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("exchan [Fact] public async Task RemoveAQueueShouldRemoveTheBindings() { - string localContainerId = $"{_containerId}_remove-queue-should-remove-binding-connection-name"; + Assert.Null(_connection); + Assert.Null(_management); IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); IManagement management = connection.Management(); ITopologyListener topologyListener = ((IManagementTopology)management).TopologyListener(); - var exSpec = management.Exchange().Name("e-remove-a-should-remove-binding") + IExchangeSpecification exSpec = management.Exchange().Name("e-remove-a-should-remove-binding") .Type(ExchangeType.DIRECT); await exSpec.DeclareAsync(); - var queueSpec = management.Queue().Name("q-remove-a-should-remove-binding") + IQueueSpecification queueSpec = management.Queue().Name("q-remove-a-should-remove-binding") .AutoDelete(true).Exclusive(true); await queueSpec.DeclareAsync(); - var queueSpecWontDeleted = management.Queue().Name("q-remove-a-should-remove-binding-wont-delete") + IQueueSpecification queueSpecWontDeleted = management.Queue().Name("q-remove-a-should-remove-binding-wont-delete") .AutoDelete(true).Exclusive(true); await queueSpecWontDeleted.DeclareAsync(); @@ -519,28 +530,30 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("e-remo [Fact] public async Task RemoveAnExchangeShouldRemoveTheBindings() { - string localContainerId = $"{_containerId}_remove-exchange-should-remove-binding-connection-name"; - var connection = await AmqpConnection.CreateAsync( + Assert.Null(_connection); + Assert.Null(_management); + + IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); IManagement management = connection.Management(); ITopologyListener topologyListener = ((IManagementTopology)management).TopologyListener(); - var exSpec = management.Exchange().Name("e-remove-exchange-should-remove-binding") + IExchangeSpecification exSpec = management.Exchange().Name("e-remove-exchange-should-remove-binding") .Type(ExchangeType.DIRECT); await exSpec.DeclareAsync(); - var exSpecWontDeleted = management.Exchange().Name("e-remove-exchange-should-remove-binding-wont-delete") + IExchangeSpecification exSpecWontDeleted = management.Exchange().Name("e-remove-exchange-should-remove-binding-wont-delete") .Type(ExchangeType.DIRECT); await exSpecWontDeleted.DeclareAsync(); - var queueSpec = management.Queue().Name("q-remove-exchange-should-remove-binding") + IQueueSpecification queueSpec = management.Queue().Name("q-remove-exchange-should-remove-binding") .AutoDelete(true).Exclusive(true); await queueSpec.DeclareAsync(); @@ -584,24 +597,26 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync( [Fact] public async Task RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings() { - string localContainerId = $"{_containerId}_remove-exchange-bound-to-another-exchange-should-remove-binding-connection-name"; - var connection = await AmqpConnection.CreateAsync( + Assert.Null(_connection); + Assert.Null(_management); + + IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(localContainerId) + .ContainerId(_containerId) .Build()); IManagement management = connection.Management(); ITopologyListener topologyListener = ((IManagementTopology)management).TopologyListener(); - var exSpec = management.Exchange().Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding") + IExchangeSpecification exSpec = management.Exchange().Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding") .Type(ExchangeType.DIRECT); await exSpec.DeclareAsync(); - var exSpecDestination = management.Exchange() + IExchangeSpecification exSpecDestination = management.Exchange() .Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination") .Type(ExchangeType.DIRECT); diff --git a/Tests/Consumer/ConsumerPauseTests.cs b/Tests/Consumer/ConsumerPauseTests.cs index 6bc6d3c3..23a0316a 100644 --- a/Tests/Consumer/ConsumerPauseTests.cs +++ b/Tests/Consumer/ConsumerPauseTests.cs @@ -13,11 +13,15 @@ namespace Tests.Consumer; -public class ConsumerPauseTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper), IDisposable +public class ConsumerPauseTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { private readonly HttpApiClient _httpApiClient = new(); - public void Dispose() => _httpApiClient.Dispose(); + public override Task DisposeAsync() + { + _httpApiClient.Dispose(); + return base.DisposeAsync(); + } [Fact] public async Task PauseShouldStopMessageArrivalUnpauseShouldResumeIt() diff --git a/Tests/IntegrationTest.cs b/Tests/IntegrationTest.cs index 7baa1339..7d39ae87 100644 --- a/Tests/IntegrationTest.cs +++ b/Tests/IntegrationTest.cs @@ -44,14 +44,18 @@ public IntegrationTest(ITestOutputHelper testOutputHelper, { _testOutputHelper = testOutputHelper; _setupConnectionAndManagement = setupConnectionAndManagement; - _queueName = $"{_testDisplayName}-queue-{Now}"; - _exchangeName = $"{_testDisplayName}-exchange-{Now}"; _testDisplayName = InitTestDisplayName(); + + _queueName = $"{_testDisplayName}-queue-{Now}"; + _exchangeName = $"{_testDisplayName}-exchange-{Now}"; _containerId = $"{_testDisplayName}:{Now}"; - // TODO only if verbose - // testOutputHelper.WriteLine($"Running test: {_testDisplayName}"); + if (SystemUtils.IsVerbose) + { + _testOutputHelper.WriteLine("{0} [DEBUG] [START] {1}", DateTime.Now, _testDisplayName); + } + _connectionSettingBuilder = InitConnectionSettingsBuilder(); } @@ -80,8 +84,11 @@ public virtual async Task InitializeAsync() public virtual async Task DisposeAsync() { - // TODO only if verbose - // _testOutputHelper.WriteLine($"Disposing test: {_testDisplayName}"); + if (SystemUtils.IsVerbose) + { + _testOutputHelper.WriteLine("{0} [DEBUG] [START DISPOSE] {1}", DateTime.Now, _testDisplayName); + } + if (_management is not null && _management.State == State.Open) { try @@ -114,6 +121,11 @@ public virtual async Task DisposeAsync() Assert.Equal(State.Closed, _connection.State); _connection.Dispose(); } + + if (SystemUtils.IsVerbose) + { + _testOutputHelper.WriteLine("{0} [DEBUG] [END DISPOSE] {1}", DateTime.Now, _testDisplayName); + } } protected static Random S_Random diff --git a/Tests/Recovery/CustomPublisherConsumerRecoveryTests.cs b/Tests/Recovery/CustomPublisherConsumerRecoveryTests.cs index b31b9956..43691db7 100644 --- a/Tests/Recovery/CustomPublisherConsumerRecoveryTests.cs +++ b/Tests/Recovery/CustomPublisherConsumerRecoveryTests.cs @@ -12,7 +12,8 @@ namespace Tests.Recovery; -public class CustomPublisherConsumerRecoveryTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper, false) +public class CustomPublisherConsumerRecoveryTests(ITestOutputHelper testOutputHelper) + : IntegrationTest(testOutputHelper, setupConnectionAndManagement: false) { /// /// The consumer and the publisher should not restart if the recovery is disabled diff --git a/Tests/Rpc/RecoveryRPCTests.cs b/Tests/Rpc/RecoveryRPCTests.cs index d84ecc84..455dcafa 100644 --- a/Tests/Rpc/RecoveryRPCTests.cs +++ b/Tests/Rpc/RecoveryRPCTests.cs @@ -1,3 +1,7 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + using System; using System.Threading; using System.Threading.Tasks; @@ -9,10 +13,14 @@ namespace Tests.Rpc { public class RecoveryRpcTests(ITestOutputHelper testOutputHelper) + : IntegrationTest(testOutputHelper, setupConnectionAndManagement: false) { [Fact] public async Task RpcServerAndClientShouldRecoverAfterKillConnection() { + Assert.Null(_connection); + Assert.Null(_management); + string containerId = $"rpc-server-client-recovery-{DateTime.Now}"; IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create() .ContainerId(containerId).RecoveryConfiguration(RecoveryConfiguration.Create().Topology(true)).Build()); @@ -25,11 +33,11 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection() int messagesReceived = 0; IRpcServer rpcServer = await connection.RpcServerBuilder() .RequestQueue(rpcRequestQueueName) - .Handler(async (context, message) => + .Handler((context, message) => { Interlocked.Increment(ref messagesReceived); var reply = context.Message("pong"); - return await Task.FromResult(reply); + return Task.FromResult(reply); }) .BuildAsync(); @@ -54,9 +62,13 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection() messagesConfirmed++; Assert.Equal("pong", response.Body()); } + catch (AmqpNotOpenException) + { + await Task.Delay(700); + } catch (Exception e) { - testOutputHelper.WriteLine($"Error sending message: {e.Message}"); + _testOutputHelper.WriteLine($"[ERROR] unexpected exception while sending message: {e.Message}"); await Task.Delay(700); } diff --git a/Tests/SystemUtils.cs b/Tests/SystemUtils.cs index 0c1a1361..2256be6f 100644 --- a/Tests/SystemUtils.cs +++ b/Tests/SystemUtils.cs @@ -11,12 +11,14 @@ namespace Tests; +// TODO this could be merged into IntegrationTest public static class SystemUtils { const string DefaultRabbitMqHost = "localhost"; private static readonly HttpApiClient s_httpApiClient = new(); private static readonly string s_rabbitMqHost = InitRabbitMqHost(); private static readonly bool s_isRunningInCI = InitIsRunningInCI(); + private static readonly bool s_isVerbose = InitIsVerbose(); private static readonly bool s_isCluster; private static readonly ushort s_clusterSize; private static readonly TimeSpan s_initialDelaySpan = TimeSpan.FromMilliseconds(100); @@ -25,6 +27,7 @@ public static class SystemUtils public static string RabbitMqHost => s_rabbitMqHost; public static bool IsRunningInCI => s_isRunningInCI; + public static bool IsVerbose => s_isVerbose; public static bool IsCluster => s_isCluster; public static ushort ClusterSize => s_clusterSize; @@ -307,4 +310,14 @@ private static bool InitIsRunningInCI() return false; } + + private static bool InitIsVerbose() + { + if (bool.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_CLIENT_TESTS_VERBOSE"), out bool isVerbose)) + { + return isVerbose; + } + + return false; + } } diff --git a/build.ps1 b/build.ps1 index 49f104a0..047f7c0a 100755 --- a/build.ps1 +++ b/build.ps1 @@ -1,8 +1,12 @@ [CmdletBinding(PositionalBinding=$false)] param( - [switch]$RunTests + [switch]$RunTests, + [switch]$RunTestsUntilFailure ) +New-Variable -Name verbose -Option Constant ` + -Value ($VerbosePreference -ne 'SilentlyContinue') + $ErrorActionPreference = 'Stop' Set-StrictMode -Version Latest $PSNativeCommandUseErrorActionPreference = $true @@ -18,17 +22,35 @@ New-Variable -Name build_csproj_file -Option Constant ` -Value (Join-Path -Path $PSScriptRoot -ChildPath 'Build.csproj') dotnet build $build_csproj_file -Write-Host "Done building." -ForegroundColor "Green" -if ($RunTests) { - Write-Host "Running tests: Build.csproj traversal (all frameworks)" -ForegroundColor "Magenta" - dotnet test $build_csproj_file --no-build --logger 'console;verbosity=detailed' - if ($LastExitCode -ne 0) +Write-Host "[INFO] done building." -ForegroundColor "Green" + +$ErrorActionPreference = 'Continue' +$PSNativeCommandUseErrorActionPreference = $true + +if ($RunTests -or $RunTestsUntilFailure) +{ + Do { - Write-Host "Error with tests, aborting build." -Foreground "Red" - Exit 1 - } - Write-Host "Tests passed!" -ForegroundColor "Green" + Write-Host "Running tests: Build.csproj traversal (all frameworks)" -ForegroundColor "Magenta" + if ($verbose) + { + # dotnet test $build_csproj_file --diag $(Join-Path -Path $PSScriptRoot -ChildPath diag.log) --environment=RABBITMQ_CLIENT_TESTS_VERBOSE=true --no-build --logger 'console;verbosity=detailed' + dotnet test $build_csproj_file --diag $(Join-Path -Path $PSScriptRoot -ChildPath diag | Join-Path -ChildPath diag.log) --no-build --logger 'console;verbosity=detailed' + } + else + { + dotnet test $build_csproj_file --no-build --logger 'console;verbosity=detailed' + } + if ($LASTEXITCODE -ne 0) + { + Write-Host "[ERROR] FYI, tests errored" -Foreground "Red" + # Write-Host "[ERROR] tests errored, exiting" -Foreground "Red" + # Exit 1 + } + else + { + Write-Host "[INFO] tests passed" -ForegroundColor "Green" + } + } While ($RunTestsUntilFailure) } - -Write-Host "Done."