From 801f4665b3059b9303b39f3079180972267e056d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 10:27:25 +0200 Subject: [PATCH 1/8] ci fail Signed-off-by: Gabriele Santomaggio --- .github/workflows/wf_build-and-test.yaml | 78 ++++++++++++------------ Tests/Rpc/RpcServerTests.cs | 12 ++-- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index dd7ed263..33ed2782 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -5,45 +5,45 @@ on: jobs: - build-win32: - runs-on: windows-latest - # https://github.com/NuGet/Home/issues/11548 - env: - NUGET_CERT_REVOCATION_MODE: offline - steps: - - uses: actions/checkout@v4 - - uses: actions/cache@v4 - with: - # Note: the cache path is relative to the workspace directory - # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action - path: ~/installers - key: ${{ runner.os }}-v0-${{ hashFiles('.ci/windows/versions.json') }} - - uses: actions/cache@v4 - with: - path: | - ~/.nuget/packages - ~/AppData/Local/NuGet/v3-cache - key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} - restore-keys: | - ${{ runner.os }}-v0-nuget- - - name: Build (Debug) - run: dotnet build ${{ github.workspace }}\Build.csproj - - name: Verify - run: dotnet format ${{ github.workspace }}\rabbitmq-amqp-dotnet-client.sln --no-restore --verify-no-changes - - name: Install and Start RabbitMQ - id: install-start-rabbitmq - run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - - name: Test - timeout-minutes: 25 - run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' - - name: Check for errors in RabbitMQ logs - run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 - - name: Maybe upload RabbitMQ logs - if: failure() - uses: actions/upload-artifact@v4 - with: - name: rabbitmq-logs-integration-win32 - path: ~/AppData/Roaming/RabbitMQ/log/ + # build-win32: + # runs-on: windows-latest + # # https://github.com/NuGet/Home/issues/11548 + # env: + # NUGET_CERT_REVOCATION_MODE: offline + # steps: + # - uses: actions/checkout@v4 + # - uses: actions/cache@v4 + # with: + # # Note: the cache path is relative to the workspace directory + # # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action + # path: ~/installers + # key: ${{ runner.os }}-v0-${{ hashFiles('.ci/windows/versions.json') }} + # - uses: actions/cache@v4 + # with: + # path: | + # ~/.nuget/packages + # ~/AppData/Local/NuGet/v3-cache + # key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} + # restore-keys: | + # ${{ runner.os }}-v0-nuget- + # - name: Build (Debug) + # run: dotnet build ${{ github.workspace }}\Build.csproj + # - name: Verify + # run: dotnet format ${{ github.workspace }}\rabbitmq-amqp-dotnet-client.sln --no-restore --verify-no-changes + # - name: Install and Start RabbitMQ + # id: install-start-rabbitmq + # run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 + # - name: Test + # timeout-minutes: 25 + # run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' + # - name: Check for errors in RabbitMQ logs + # run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 + # - name: Maybe upload RabbitMQ logs + # if: failure() + # uses: actions/upload-artifact@v4 + # with: + # name: rabbitmq-logs-integration-win32 + # path: ~/AppData/Roaming/RabbitMQ/log/ build-ubuntu: runs-on: ubuntu-latest steps: diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 8fee5c32..6ae44022 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -11,7 +11,7 @@ namespace Tests.Rpc { public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { - [Fact] + [Fact (Skip = "ci fail")] public async Task MockRpcServerPingPong() { Assert.NotNull(_connection); @@ -33,7 +33,7 @@ public async Task MockRpcServerPingPong() await rpcServer.CloseAsync(); } - [Fact] + [Fact (Skip = "ci fail")] public async Task RpcServerValidateStateChange() { Assert.NotNull(_connection); @@ -67,7 +67,7 @@ public async Task RpcServerValidateStateChange() /// /// Simulate RPC communication with a publisher /// - [Fact] + [Fact (Skip = "ci fail")] public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() { Assert.NotNull(_connection); @@ -113,7 +113,7 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() /// In this test the client has to create a reply queue since is not provided by the user /// with the ReplyToQueue method /// - [Fact] + [Fact (Skip = "ci fail")] public async Task RpcServerClientPingPongWithDefault() { Assert.NotNull(_connection); @@ -143,7 +143,7 @@ public async Task RpcServerClientPingPongWithDefault() /// /// In this test the client has to use the ReplyToQueue provided by the user /// - [Fact] + [Fact (Skip = "ci fail")] public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() { Assert.NotNull(_connection); @@ -195,7 +195,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS /// /// - [Fact] + [Fact (Skip = "ci fail")] public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() { Assert.NotNull(_connection); From efdef212e4a236d2f52202c54cb05cc426d1a44e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 10:29:01 +0200 Subject: [PATCH 2/8] ci fail Signed-off-by: Gabriele Santomaggio --- Tests/Rpc/RpcServerTests.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 6ae44022..708cf5c0 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -11,7 +11,7 @@ namespace Tests.Rpc { public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { - [Fact (Skip = "ci fail")] + [Fact(Skip = "ci fail")] public async Task MockRpcServerPingPong() { Assert.NotNull(_connection); @@ -33,7 +33,7 @@ public async Task MockRpcServerPingPong() await rpcServer.CloseAsync(); } - [Fact (Skip = "ci fail")] + [Fact(Skip = "ci fail")] public async Task RpcServerValidateStateChange() { Assert.NotNull(_connection); @@ -67,7 +67,7 @@ public async Task RpcServerValidateStateChange() /// /// Simulate RPC communication with a publisher /// - [Fact (Skip = "ci fail")] + [Fact(Skip = "ci fail")] public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() { Assert.NotNull(_connection); @@ -113,7 +113,7 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() /// In this test the client has to create a reply queue since is not provided by the user /// with the ReplyToQueue method /// - [Fact (Skip = "ci fail")] + [Fact(Skip = "ci fail")] public async Task RpcServerClientPingPongWithDefault() { Assert.NotNull(_connection); @@ -143,7 +143,7 @@ public async Task RpcServerClientPingPongWithDefault() /// /// In this test the client has to use the ReplyToQueue provided by the user /// - [Fact (Skip = "ci fail")] + [Fact(Skip = "ci fail")] public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() { Assert.NotNull(_connection); @@ -195,7 +195,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS /// /// - [Fact (Skip = "ci fail")] + [Fact(Skip = "ci fail")] public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() { Assert.NotNull(_connection); From 5647f0f79f3ccda9d20712205f9bac3e131a893d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 10:36:04 +0200 Subject: [PATCH 3/8] ci fail Signed-off-by: Gabriele Santomaggio --- Tests/Rpc/RpcServerTests.cs | 442 ++++++++++++++++++------------------ 1 file changed, 221 insertions(+), 221 deletions(-) diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 708cf5c0..2e96a2cb 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -11,7 +11,7 @@ namespace Tests.Rpc { public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { - [Fact(Skip = "ci fail")] + [Fact] public async Task MockRpcServerPingPong() { Assert.NotNull(_connection); @@ -33,225 +33,225 @@ public async Task MockRpcServerPingPong() await rpcServer.CloseAsync(); } - [Fact(Skip = "ci fail")] - public async Task RpcServerValidateStateChange() - { - Assert.NotNull(_connection); - Assert.NotNull(_management); - List<(State, State)> states = []; - await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); - TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); - IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - { - var m = context.Message(request.Body()); - return Task.FromResult(m); - }).RequestQueue(_queueName).BuildAsync(); - rpcServer.ChangeState += (sender, fromState, toState, e) => - { - states.Add((fromState, toState)); - if (states.Count == 2) - { - tsc.SetResult(states.Count); - } - }; - Assert.NotNull(rpcServer); - await rpcServer.CloseAsync(); - int count = await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(2, count); - Assert.Equal(State.Open, states[0].Item1); - Assert.Equal(State.Closing, states[0].Item2); - Assert.Equal(State.Closing, states[1].Item1); - Assert.Equal(State.Closed, states[1].Item2); - } - - /// - /// Simulate RPC communication with a publisher - /// - [Fact(Skip = "ci fail")] - public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() - { - Assert.NotNull(_connection); - Assert.NotNull(_management); - string requestQueue = _queueName; - await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - { - var reply = context.Message("pong"); - return Task.FromResult(reply); - }).RequestQueue(requestQueue).BuildAsync(); - - Assert.NotNull(rpcServer); - string queueReplyTo = $"queueReplyTo-{Now}"; - IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true); - await spec.DeclareAsync(); - TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - - IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler( - async (context, message) => - { - await context.AcceptAsync(); - tcs.SetResult(message); - }).BuildAndStartAsync(); - - IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync(); - Assert.NotNull(publisher); - AddressBuilder addressBuilder = new(); - - IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address()); - PublishResult pr = await publisher.PublishAsync(message); - Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); - - IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal("pong", m.Body()); - - await rpcServer.CloseAsync(); - await consumer.CloseAsync(); - await publisher.CloseAsync(); - } - - /// - /// In this test the client has to create a reply queue since is not provided by the user - /// with the ReplyToQueue method - /// - [Fact(Skip = "ci fail")] - public async Task RpcServerClientPingPongWithDefault() - { - Assert.NotNull(_connection); - Assert.NotNull(_management); - string requestQueue = _queueName; - await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - { - var reply = context.Message("pong"); - return Task.FromResult(reply); - }).RequestQueue(_queueName).BuildAsync(); - Assert.NotNull(rpcServer); - - IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() - .Queue(requestQueue) - .RpcClient() - .BuildAsync(); - - IMessage message = new AmqpMessage("ping"); - - IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); - await rpcClient.CloseAsync(); - await rpcServer.CloseAsync(); - } - - /// - /// In this test the client has to use the ReplyToQueue provided by the user - /// - [Fact(Skip = "ci fail")] - public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() - { - Assert.NotNull(_connection); - Assert.NotNull(_management); - string requestQueue = _queueName; - await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - { - var reply = context.Message("pong"); - return Task.FromResult(reply); - }).RequestQueue(_queueName) - .BuildAsync(); - Assert.NotNull(rpcServer); - - // custom replyTo queue - IQueueInfo replyTo = - await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); - - // custom correlationId supplier - const string correlationId = "my-correlation-id"; - - IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() - .Queue(requestQueue) - .RpcClient() - .CorrelationIdSupplier(() => correlationId) - .CorrelationIdExtractor(message => message.CorrelationId()) - .ReplyToQueue(replyTo.Name()) - .BuildAsync(); - - IMessage message = new AmqpMessage("ping"); - - IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); - Assert.Equal(correlationId, response.CorrelationId()); - await rpcClient.CloseAsync(); - await rpcServer.CloseAsync(); - } - - /// - /// This test combine all the features with the overriding of the request and response post processor - /// the correlation id supplier and the extraction of the correlationId. - /// Here the client uses the replyTo queue provided by the user and the correlationId supplier - /// the field "Subject" is used as correlationId - /// The server uses the field "GroupId" as correlationId - /// Both use the extraction correlationId to get the correlationId - /// - /// The fields "Subject" and "GroupId" are used ONLY for test. - /// You should not use these fields for this purpose. - /// - /// - - [Fact(Skip = "ci fail")] - public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() - { - Assert.NotNull(_connection); - Assert.NotNull(_management); - string requestQueue = _queueName; - await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - { - var reply = context.Message("pong"); - return Task.FromResult(reply); - }).RequestQueue(_queueName) - //come from the client - .CorrelationIdExtractor(message => message.Subject()) - // replace the correlation id location with GroupId - .ReplyPostProcessor((reply, replyCorrelationId) => reply.GroupId( - replyCorrelationId.ToString() ?? throw new InvalidOperationException())) - .BuildAsync(); - Assert.NotNull(rpcServer); - - IQueueInfo replyTo = - await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); - - // custom correlationId supplier - const string correlationId = "my-correlation-id"; - int correlationIdCounter = 0; - - IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() - .Queue(requestQueue) - .RpcClient() - .ReplyToQueue(replyTo.Name()) - // replace the correlation id creation with a custom function - .CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}") - // The server will reply with the correlation id in the groupId - // This is only for testing. You should not use the groupId for this. - .CorrelationIdExtractor(message => message.GroupId()) - // The client will use Subject to store the correlation id - // this is only for testing. You should not use Subject for this. - .RequestPostProcessor((request, requestCorrelationId) - => request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(replyTo.Name()).Address()) - .Subject(requestCorrelationId.ToString() ?? throw new InvalidOperationException())) - .BuildAsync(); - - IMessage message = new AmqpMessage("ping"); - - int i = 1; - while (i < 30) - { - IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); - // the server replies with the correlation id in the GroupId field - Assert.Equal($"{correlationId}_{i}", response.GroupId()); - i++; - } - - await rpcClient.CloseAsync(); - await rpcServer.CloseAsync(); - } + // [Fact] + // public async Task RpcServerValidateStateChange() + // { + // Assert.NotNull(_connection); + // Assert.NotNull(_management); + // List<(State, State)> states = []; + // await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); + // TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); + // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + // { + // var m = context.Message(request.Body()); + // return Task.FromResult(m); + // }).RequestQueue(_queueName).BuildAsync(); + // rpcServer.ChangeState += (sender, fromState, toState, e) => + // { + // states.Add((fromState, toState)); + // if (states.Count == 2) + // { + // tsc.SetResult(states.Count); + // } + // }; + // Assert.NotNull(rpcServer); + // await rpcServer.CloseAsync(); + // int count = await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); + // Assert.Equal(2, count); + // Assert.Equal(State.Open, states[0].Item1); + // Assert.Equal(State.Closing, states[0].Item2); + // Assert.Equal(State.Closing, states[1].Item1); + // Assert.Equal(State.Closed, states[1].Item2); + // } + // + // /// + // /// Simulate RPC communication with a publisher + // /// + // [Fact] + // public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() + // { + // Assert.NotNull(_connection); + // Assert.NotNull(_management); + // string requestQueue = _queueName; + // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + // { + // var reply = context.Message("pong"); + // return Task.FromResult(reply); + // }).RequestQueue(requestQueue).BuildAsync(); + // + // Assert.NotNull(rpcServer); + // string queueReplyTo = $"queueReplyTo-{Now}"; + // IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true); + // await spec.DeclareAsync(); + // TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + // + // IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler( + // async (context, message) => + // { + // await context.AcceptAsync(); + // tcs.SetResult(message); + // }).BuildAndStartAsync(); + // + // IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync(); + // Assert.NotNull(publisher); + // AddressBuilder addressBuilder = new(); + // + // IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address()); + // PublishResult pr = await publisher.PublishAsync(message); + // Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + // + // IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + // Assert.Equal("pong", m.Body()); + // + // await rpcServer.CloseAsync(); + // await consumer.CloseAsync(); + // await publisher.CloseAsync(); + // } + // + // /// + // /// In this test the client has to create a reply queue since is not provided by the user + // /// with the ReplyToQueue method + // /// + // [Fact] + // public async Task RpcServerClientPingPongWithDefault() + // { + // Assert.NotNull(_connection); + // Assert.NotNull(_management); + // string requestQueue = _queueName; + // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + // { + // var reply = context.Message("pong"); + // return Task.FromResult(reply); + // }).RequestQueue(_queueName).BuildAsync(); + // Assert.NotNull(rpcServer); + // + // IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + // .Queue(requestQueue) + // .RpcClient() + // .BuildAsync(); + // + // IMessage message = new AmqpMessage("ping"); + // + // IMessage response = await rpcClient.PublishAsync(message); + // Assert.Equal("pong", response.Body()); + // await rpcClient.CloseAsync(); + // await rpcServer.CloseAsync(); + // } + // + // /// + // /// In this test the client has to use the ReplyToQueue provided by the user + // /// + // [Fact] + // public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() + // { + // Assert.NotNull(_connection); + // Assert.NotNull(_management); + // string requestQueue = _queueName; + // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + // { + // var reply = context.Message("pong"); + // return Task.FromResult(reply); + // }).RequestQueue(_queueName) + // .BuildAsync(); + // Assert.NotNull(rpcServer); + // + // // custom replyTo queue + // IQueueInfo replyTo = + // await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + // + // // custom correlationId supplier + // const string correlationId = "my-correlation-id"; + // + // IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + // .Queue(requestQueue) + // .RpcClient() + // .CorrelationIdSupplier(() => correlationId) + // .CorrelationIdExtractor(message => message.CorrelationId()) + // .ReplyToQueue(replyTo.Name()) + // .BuildAsync(); + // + // IMessage message = new AmqpMessage("ping"); + // + // IMessage response = await rpcClient.PublishAsync(message); + // Assert.Equal("pong", response.Body()); + // Assert.Equal(correlationId, response.CorrelationId()); + // await rpcClient.CloseAsync(); + // await rpcServer.CloseAsync(); + // } + // + // /// + // /// This test combine all the features with the overriding of the request and response post processor + // /// the correlation id supplier and the extraction of the correlationId. + // /// Here the client uses the replyTo queue provided by the user and the correlationId supplier + // /// the field "Subject" is used as correlationId + // /// The server uses the field "GroupId" as correlationId + // /// Both use the extraction correlationId to get the correlationId + // /// + // /// The fields "Subject" and "GroupId" are used ONLY for test. + // /// You should not use these fields for this purpose. + // /// + // /// + // + // [Fact] + // public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() + // { + // Assert.NotNull(_connection); + // Assert.NotNull(_management); + // string requestQueue = _queueName; + // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + // { + // var reply = context.Message("pong"); + // return Task.FromResult(reply); + // }).RequestQueue(_queueName) + // //come from the client + // .CorrelationIdExtractor(message => message.Subject()) + // // replace the correlation id location with GroupId + // .ReplyPostProcessor((reply, replyCorrelationId) => reply.GroupId( + // replyCorrelationId.ToString() ?? throw new InvalidOperationException())) + // .BuildAsync(); + // Assert.NotNull(rpcServer); + // + // IQueueInfo replyTo = + // await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + // + // // custom correlationId supplier + // const string correlationId = "my-correlation-id"; + // int correlationIdCounter = 0; + // + // IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + // .Queue(requestQueue) + // .RpcClient() + // .ReplyToQueue(replyTo.Name()) + // // replace the correlation id creation with a custom function + // .CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}") + // // The server will reply with the correlation id in the groupId + // // This is only for testing. You should not use the groupId for this. + // .CorrelationIdExtractor(message => message.GroupId()) + // // The client will use Subject to store the correlation id + // // this is only for testing. You should not use Subject for this. + // .RequestPostProcessor((request, requestCorrelationId) + // => request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(replyTo.Name()).Address()) + // .Subject(requestCorrelationId.ToString() ?? throw new InvalidOperationException())) + // .BuildAsync(); + // + // IMessage message = new AmqpMessage("ping"); + // + // int i = 1; + // while (i < 30) + // { + // IMessage response = await rpcClient.PublishAsync(message); + // Assert.Equal("pong", response.Body()); + // // the server replies with the correlation id in the GroupId field + // Assert.Equal($"{correlationId}_{i}", response.GroupId()); + // i++; + // } + // + // await rpcClient.CloseAsync(); + // await rpcServer.CloseAsync(); + // } } } From 79f0af82e5b977321db803f18e9868bfd9faef70 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 11:24:50 +0200 Subject: [PATCH 4/8] change install dotnet Signed-off-by: Gabriele Santomaggio --- .github/workflows/wf_build-and-test.yaml | 12 +- Tests/Rpc/RpcServerTests.cs | 440 +++++++++++------------ 2 files changed, 225 insertions(+), 227 deletions(-) diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index 33ed2782..a08c6f53 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -48,14 +48,12 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: actions/cache@v4 + - name: Setup dotnet + uses: actions/setup-dotnet@v4 with: - path: | - ~/.nuget/packages - ~/.local/share/NuGet/v3-cache - key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} - restore-keys: | - ${{ runner.os }}-v0-nuget- + dotnet-version: | + 6.0.x + 8.0.x - name: Build (Debug) run: dotnet build ${{ github.workspace }}/Build.csproj - name: Verify diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 2e96a2cb..8fee5c32 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -33,225 +33,225 @@ public async Task MockRpcServerPingPong() await rpcServer.CloseAsync(); } - // [Fact] - // public async Task RpcServerValidateStateChange() - // { - // Assert.NotNull(_connection); - // Assert.NotNull(_management); - // List<(State, State)> states = []; - // await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); - // TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); - // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - // { - // var m = context.Message(request.Body()); - // return Task.FromResult(m); - // }).RequestQueue(_queueName).BuildAsync(); - // rpcServer.ChangeState += (sender, fromState, toState, e) => - // { - // states.Add((fromState, toState)); - // if (states.Count == 2) - // { - // tsc.SetResult(states.Count); - // } - // }; - // Assert.NotNull(rpcServer); - // await rpcServer.CloseAsync(); - // int count = await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); - // Assert.Equal(2, count); - // Assert.Equal(State.Open, states[0].Item1); - // Assert.Equal(State.Closing, states[0].Item2); - // Assert.Equal(State.Closing, states[1].Item1); - // Assert.Equal(State.Closed, states[1].Item2); - // } - // - // /// - // /// Simulate RPC communication with a publisher - // /// - // [Fact] - // public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() - // { - // Assert.NotNull(_connection); - // Assert.NotNull(_management); - // string requestQueue = _queueName; - // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - // { - // var reply = context.Message("pong"); - // return Task.FromResult(reply); - // }).RequestQueue(requestQueue).BuildAsync(); - // - // Assert.NotNull(rpcServer); - // string queueReplyTo = $"queueReplyTo-{Now}"; - // IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true); - // await spec.DeclareAsync(); - // TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - // - // IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler( - // async (context, message) => - // { - // await context.AcceptAsync(); - // tcs.SetResult(message); - // }).BuildAndStartAsync(); - // - // IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync(); - // Assert.NotNull(publisher); - // AddressBuilder addressBuilder = new(); - // - // IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address()); - // PublishResult pr = await publisher.PublishAsync(message); - // Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); - // - // IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - // Assert.Equal("pong", m.Body()); - // - // await rpcServer.CloseAsync(); - // await consumer.CloseAsync(); - // await publisher.CloseAsync(); - // } - // - // /// - // /// In this test the client has to create a reply queue since is not provided by the user - // /// with the ReplyToQueue method - // /// - // [Fact] - // public async Task RpcServerClientPingPongWithDefault() - // { - // Assert.NotNull(_connection); - // Assert.NotNull(_management); - // string requestQueue = _queueName; - // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - // { - // var reply = context.Message("pong"); - // return Task.FromResult(reply); - // }).RequestQueue(_queueName).BuildAsync(); - // Assert.NotNull(rpcServer); - // - // IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() - // .Queue(requestQueue) - // .RpcClient() - // .BuildAsync(); - // - // IMessage message = new AmqpMessage("ping"); - // - // IMessage response = await rpcClient.PublishAsync(message); - // Assert.Equal("pong", response.Body()); - // await rpcClient.CloseAsync(); - // await rpcServer.CloseAsync(); - // } - // - // /// - // /// In this test the client has to use the ReplyToQueue provided by the user - // /// - // [Fact] - // public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() - // { - // Assert.NotNull(_connection); - // Assert.NotNull(_management); - // string requestQueue = _queueName; - // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - // { - // var reply = context.Message("pong"); - // return Task.FromResult(reply); - // }).RequestQueue(_queueName) - // .BuildAsync(); - // Assert.NotNull(rpcServer); - // - // // custom replyTo queue - // IQueueInfo replyTo = - // await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); - // - // // custom correlationId supplier - // const string correlationId = "my-correlation-id"; - // - // IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() - // .Queue(requestQueue) - // .RpcClient() - // .CorrelationIdSupplier(() => correlationId) - // .CorrelationIdExtractor(message => message.CorrelationId()) - // .ReplyToQueue(replyTo.Name()) - // .BuildAsync(); - // - // IMessage message = new AmqpMessage("ping"); - // - // IMessage response = await rpcClient.PublishAsync(message); - // Assert.Equal("pong", response.Body()); - // Assert.Equal(correlationId, response.CorrelationId()); - // await rpcClient.CloseAsync(); - // await rpcServer.CloseAsync(); - // } - // - // /// - // /// This test combine all the features with the overriding of the request and response post processor - // /// the correlation id supplier and the extraction of the correlationId. - // /// Here the client uses the replyTo queue provided by the user and the correlationId supplier - // /// the field "Subject" is used as correlationId - // /// The server uses the field "GroupId" as correlationId - // /// Both use the extraction correlationId to get the correlationId - // /// - // /// The fields "Subject" and "GroupId" are used ONLY for test. - // /// You should not use these fields for this purpose. - // /// - // /// - // - // [Fact] - // public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() - // { - // Assert.NotNull(_connection); - // Assert.NotNull(_management); - // string requestQueue = _queueName; - // await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - // IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => - // { - // var reply = context.Message("pong"); - // return Task.FromResult(reply); - // }).RequestQueue(_queueName) - // //come from the client - // .CorrelationIdExtractor(message => message.Subject()) - // // replace the correlation id location with GroupId - // .ReplyPostProcessor((reply, replyCorrelationId) => reply.GroupId( - // replyCorrelationId.ToString() ?? throw new InvalidOperationException())) - // .BuildAsync(); - // Assert.NotNull(rpcServer); - // - // IQueueInfo replyTo = - // await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); - // - // // custom correlationId supplier - // const string correlationId = "my-correlation-id"; - // int correlationIdCounter = 0; - // - // IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() - // .Queue(requestQueue) - // .RpcClient() - // .ReplyToQueue(replyTo.Name()) - // // replace the correlation id creation with a custom function - // .CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}") - // // The server will reply with the correlation id in the groupId - // // This is only for testing. You should not use the groupId for this. - // .CorrelationIdExtractor(message => message.GroupId()) - // // The client will use Subject to store the correlation id - // // this is only for testing. You should not use Subject for this. - // .RequestPostProcessor((request, requestCorrelationId) - // => request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(replyTo.Name()).Address()) - // .Subject(requestCorrelationId.ToString() ?? throw new InvalidOperationException())) - // .BuildAsync(); - // - // IMessage message = new AmqpMessage("ping"); - // - // int i = 1; - // while (i < 30) - // { - // IMessage response = await rpcClient.PublishAsync(message); - // Assert.Equal("pong", response.Body()); - // // the server replies with the correlation id in the GroupId field - // Assert.Equal($"{correlationId}_{i}", response.GroupId()); - // i++; - // } - // - // await rpcClient.CloseAsync(); - // await rpcServer.CloseAsync(); - // } + [Fact] + public async Task RpcServerValidateStateChange() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + List<(State, State)> states = []; + await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); + TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var m = context.Message(request.Body()); + return Task.FromResult(m); + }).RequestQueue(_queueName).BuildAsync(); + rpcServer.ChangeState += (sender, fromState, toState, e) => + { + states.Add((fromState, toState)); + if (states.Count == 2) + { + tsc.SetResult(states.Count); + } + }; + Assert.NotNull(rpcServer); + await rpcServer.CloseAsync(); + int count = await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(2, count); + Assert.Equal(State.Open, states[0].Item1); + Assert.Equal(State.Closing, states[0].Item2); + Assert.Equal(State.Closing, states[1].Item1); + Assert.Equal(State.Closed, states[1].Item2); + } + + /// + /// Simulate RPC communication with a publisher + /// + [Fact] + public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(requestQueue).BuildAsync(); + + Assert.NotNull(rpcServer); + string queueReplyTo = $"queueReplyTo-{Now}"; + IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true); + await spec.DeclareAsync(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler( + async (context, message) => + { + await context.AcceptAsync(); + tcs.SetResult(message); + }).BuildAndStartAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync(); + Assert.NotNull(publisher); + AddressBuilder addressBuilder = new(); + + IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address()); + PublishResult pr = await publisher.PublishAsync(message); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + + IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("pong", m.Body()); + + await rpcServer.CloseAsync(); + await consumer.CloseAsync(); + await publisher.CloseAsync(); + } + + /// + /// In this test the client has to create a reply queue since is not provided by the user + /// with the ReplyToQueue method + /// + [Fact] + public async Task RpcServerClientPingPongWithDefault() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName).BuildAsync(); + Assert.NotNull(rpcServer); + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } + + /// + /// In this test the client has to use the ReplyToQueue provided by the user + /// + [Fact] + public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName) + .BuildAsync(); + Assert.NotNull(rpcServer); + + // custom replyTo queue + IQueueInfo replyTo = + await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + + // custom correlationId supplier + const string correlationId = "my-correlation-id"; + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .CorrelationIdSupplier(() => correlationId) + .CorrelationIdExtractor(message => message.CorrelationId()) + .ReplyToQueue(replyTo.Name()) + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + Assert.Equal(correlationId, response.CorrelationId()); + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } + + /// + /// This test combine all the features with the overriding of the request and response post processor + /// the correlation id supplier and the extraction of the correlationId. + /// Here the client uses the replyTo queue provided by the user and the correlationId supplier + /// the field "Subject" is used as correlationId + /// The server uses the field "GroupId" as correlationId + /// Both use the extraction correlationId to get the correlationId + /// + /// The fields "Subject" and "GroupId" are used ONLY for test. + /// You should not use these fields for this purpose. + /// + /// + + [Fact] + public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName) + //come from the client + .CorrelationIdExtractor(message => message.Subject()) + // replace the correlation id location with GroupId + .ReplyPostProcessor((reply, replyCorrelationId) => reply.GroupId( + replyCorrelationId.ToString() ?? throw new InvalidOperationException())) + .BuildAsync(); + Assert.NotNull(rpcServer); + + IQueueInfo replyTo = + await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + + // custom correlationId supplier + const string correlationId = "my-correlation-id"; + int correlationIdCounter = 0; + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .ReplyToQueue(replyTo.Name()) + // replace the correlation id creation with a custom function + .CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}") + // The server will reply with the correlation id in the groupId + // This is only for testing. You should not use the groupId for this. + .CorrelationIdExtractor(message => message.GroupId()) + // The client will use Subject to store the correlation id + // this is only for testing. You should not use Subject for this. + .RequestPostProcessor((request, requestCorrelationId) + => request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(replyTo.Name()).Address()) + .Subject(requestCorrelationId.ToString() ?? throw new InvalidOperationException())) + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + int i = 1; + while (i < 30) + { + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + // the server replies with the correlation id in the GroupId field + Assert.Equal($"{correlationId}_{i}", response.GroupId()); + i++; + } + + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } } } From c69414fb6be395ef5b64a9bce08f1646856dc9d0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 11:41:29 +0200 Subject: [PATCH 5/8] restore windows Signed-off-by: Gabriele Santomaggio --- .github/workflows/wf_build-and-test.yaml | 78 ++++++++++++------------ 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index a08c6f53..297a1043 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -5,45 +5,45 @@ on: jobs: - # build-win32: - # runs-on: windows-latest - # # https://github.com/NuGet/Home/issues/11548 - # env: - # NUGET_CERT_REVOCATION_MODE: offline - # steps: - # - uses: actions/checkout@v4 - # - uses: actions/cache@v4 - # with: - # # Note: the cache path is relative to the workspace directory - # # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action - # path: ~/installers - # key: ${{ runner.os }}-v0-${{ hashFiles('.ci/windows/versions.json') }} - # - uses: actions/cache@v4 - # with: - # path: | - # ~/.nuget/packages - # ~/AppData/Local/NuGet/v3-cache - # key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} - # restore-keys: | - # ${{ runner.os }}-v0-nuget- - # - name: Build (Debug) - # run: dotnet build ${{ github.workspace }}\Build.csproj - # - name: Verify - # run: dotnet format ${{ github.workspace }}\rabbitmq-amqp-dotnet-client.sln --no-restore --verify-no-changes - # - name: Install and Start RabbitMQ - # id: install-start-rabbitmq - # run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - # - name: Test - # timeout-minutes: 25 - # run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' - # - name: Check for errors in RabbitMQ logs - # run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 - # - name: Maybe upload RabbitMQ logs - # if: failure() - # uses: actions/upload-artifact@v4 - # with: - # name: rabbitmq-logs-integration-win32 - # path: ~/AppData/Roaming/RabbitMQ/log/ + build-win32: + runs-on: windows-latest + # https://github.com/NuGet/Home/issues/11548 + env: + NUGET_CERT_REVOCATION_MODE: offline + steps: + - uses: actions/checkout@v4 + - uses: actions/cache@v4 + with: + # Note: the cache path is relative to the workspace directory + # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action + path: ~/installers + key: ${{ runner.os }}-v0-${{ hashFiles('.ci/windows/versions.json') }} + - uses: actions/cache@v4 + with: + path: | + ~/.nuget/packages + ~/AppData/Local/NuGet/v3-cache + key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} + restore-keys: | + ${{ runner.os }}-v0-nuget- + - name: Build (Debug) + run: dotnet build ${{ github.workspace }}\Build.csproj + - name: Verify + run: dotnet format ${{ github.workspace }}\rabbitmq-amqp-dotnet-client.sln --no-restore --verify-no-changes + - name: Install and Start RabbitMQ + id: install-start-rabbitmq + run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 + - name: Test + timeout-minutes: 25 + run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' + - name: Check for errors in RabbitMQ logs + run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 + - name: Maybe upload RabbitMQ logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: rabbitmq-logs-integration-win32 + path: ~/AppData/Roaming/RabbitMQ/log/ build-ubuntu: runs-on: ubuntu-latest steps: From 22a1c222bc793566fc9aa155738024cc8b706db8 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 12:25:11 +0200 Subject: [PATCH 6/8] Add logger Signed-off-by: Gabriele Santomaggio --- Tests/ConnectionRecoveryTests.cs | 8 +------- Tests/IntegrationTest.cs | 4 +++- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/Tests/ConnectionRecoveryTests.cs b/Tests/ConnectionRecoveryTests.cs index 5b30a8cd..5ae05d35 100644 --- a/Tests/ConnectionRecoveryTests.cs +++ b/Tests/ConnectionRecoveryTests.cs @@ -44,9 +44,8 @@ public void Reset() public int CurrentAttempt => 1; } -public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) +public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { - private readonly ITestOutputHelper _testOutputHelper = testOutputHelper; /// /// The normal close the status should be correct and error null @@ -623,9 +622,4 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync("e-r await connection.CloseAsync(); } - - private static TaskCompletionSource CreateTaskCompletionSource() - { - return new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } } diff --git a/Tests/IntegrationTest.cs b/Tests/IntegrationTest.cs index a598bb14..146b6c5a 100644 --- a/Tests/IntegrationTest.cs +++ b/Tests/IntegrationTest.cs @@ -34,13 +34,13 @@ public IntegrationTest(ITestOutputHelper testOutputHelper, { _testOutputHelper = testOutputHelper; _setupConnectionAndManagement = setupConnectionAndManagement; - _queueName = $"{_testDisplayName}-queue-{Now}"; _exchangeName = $"{_testDisplayName}-exchange-{Now}"; _testDisplayName = InitTestDisplayName(); _containerId = $"{_testDisplayName}:{Now}"; + testOutputHelper.WriteLine($"Running test: {_testDisplayName}"); _connectionSettingBuilder = InitConnectionSettingsBuilder(); } @@ -63,6 +63,7 @@ public virtual async Task InitializeAsync() public virtual async Task DisposeAsync() { + _testOutputHelper.WriteLine($"Disposing test: {_testDisplayName}"); if (_management is not null && _management.State == State.Open) { try @@ -72,6 +73,7 @@ public virtual async Task DisposeAsync() } catch { + } try From 9bc4aa899db40d5a9e4f70d9fd3bb3d51be876bb Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 12:26:52 +0200 Subject: [PATCH 7/8] Add logger Signed-off-by: Gabriele Santomaggio --- Tests/IntegrationTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/IntegrationTest.cs b/Tests/IntegrationTest.cs index 146b6c5a..4d151ff5 100644 --- a/Tests/IntegrationTest.cs +++ b/Tests/IntegrationTest.cs @@ -73,7 +73,7 @@ public virtual async Task DisposeAsync() } catch { - + } try From 0b3624450f97ee744c91bc7085f3f155dc4d5b8c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 14:16:12 +0200 Subject: [PATCH 8/8] extend recovery connection test from integration test Signed-off-by: Gabriele Santomaggio --- Tests/ConnectionRecoveryTests.cs | 88 ++++++++++++++++++-------------- Tests/SystemUtils.cs | 4 +- 2 files changed, 52 insertions(+), 40 deletions(-) diff --git a/Tests/ConnectionRecoveryTests.cs b/Tests/ConnectionRecoveryTests.cs index 5ae05d35..984353eb 100644 --- a/Tests/ConnectionRecoveryTests.cs +++ b/Tests/ConnectionRecoveryTests.cs @@ -46,7 +46,6 @@ public void Reset() public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { - /// /// The normal close the status should be correct and error null /// The test records the status change when the connection is closed normally. @@ -58,12 +57,13 @@ public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper) : Integ [InlineData(false)] public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRecovery) { - string containerId = Guid.NewGuid().ToString(); + string localContainerId = $"{_containerId}_normal-close-connection-name"; + IConnection connection = await AmqpConnection.CreateAsync( - ConnectionSettingBuilder.Create().ContainerId(containerId).RecoveryConfiguration( + ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration( RecoveryConfiguration.Create().Activated(activeRecovery).Topology(false)).Build()); - TaskCompletionSource connectionClosedStateTcs = CreateTaskCompletionSource(); + TaskCompletionSource connectionClosedStateTcs = CreateTaskCompletionSource(); var listFromStatus = new List(); var listToStatus = new List(); var listError = new List(); @@ -104,14 +104,16 @@ public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRec [Fact] public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() { - const string containerId = "unexpected-close-connection-name"; + + string localContainerId = $"{_containerId}_unexpected-close-connection-name"; + IConnection connection = await AmqpConnection.CreateAsync( - ConnectionSettingBuilder.Create().ContainerId(containerId).RecoveryConfiguration( + ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration( RecoveryConfiguration.Create().Activated(true).Topology(false) .BackOffDelayPolicy(new FakeFastBackOffDelay())).Build()); - TaskCompletionSource listErrorCountGreaterThanOrEqualToTwoTcs = CreateTaskCompletionSource(); - TaskCompletionSource listErrorCountGreaterThanOrEqualToFourTcs = CreateTaskCompletionSource(); + TaskCompletionSource listErrorCountGreaterThanOrEqualToTwoTcs = CreateTaskCompletionSource(); + TaskCompletionSource listErrorCountGreaterThanOrEqualToFourTcs = CreateTaskCompletionSource(); var listFromStatus = new List(); var listToStatus = new List(); @@ -128,6 +130,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() // Note: must use try since it'll be called again listErrorCountGreaterThanOrEqualToTwoTcs.TrySetResult(true); } + if (listError.Count >= 4) { listErrorCountGreaterThanOrEqualToFourTcs.SetResult(true); @@ -141,7 +144,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() }; Assert.Equal(State.Open, connection.State); - await SystemUtils.WaitUntilConnectionIsKilled(containerId); + await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); await listErrorCountGreaterThanOrEqualToTwoTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal(State.Open, listFromStatus[0]); @@ -174,15 +177,16 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() [Fact] public async Task OverrideTheBackOffWithBackOffDisabled() { - string containerId = Guid.NewGuid().ToString(); + + string localContainerId = $"{_containerId}_override-backoff-disabled-connection-name"; IConnection connection = await AmqpConnection.CreateAsync( - ConnectionSettingBuilder.Create().ContainerId(containerId).RecoveryConfiguration( + ConnectionSettingBuilder.Create().ContainerId(localContainerId).RecoveryConfiguration( RecoveryConfiguration.Create().Activated(true).Topology(false).BackOffDelayPolicy( new FakeBackOffDelayPolicyDisabled())).Build()); var listFromStatus = new List(); - TaskCompletionSource listFromStatusCountGreaterOrEqualToTwo = CreateTaskCompletionSource(); - TaskCompletionSource listErrorCountGreaterOrEqualToTwo = CreateTaskCompletionSource(); + TaskCompletionSource listFromStatusCountGreaterOrEqualToTwo = CreateTaskCompletionSource(); + TaskCompletionSource listErrorCountGreaterOrEqualToTwo = CreateTaskCompletionSource(); var listToStatus = new List(); var listError = new List(); @@ -194,10 +198,12 @@ public async Task OverrideTheBackOffWithBackOffDisabled() { listError.Add(error); } + if (listFromStatus.Count >= 2) { listFromStatusCountGreaterOrEqualToTwo.TrySetResult(true); } + if (listError.Count >= 2) { listErrorCountGreaterOrEqualToTwo.SetResult(true); @@ -205,7 +211,7 @@ public async Task OverrideTheBackOffWithBackOffDisabled() }; Assert.Equal(State.Open, connection.State); - await SystemUtils.WaitUntilConnectionIsKilled(containerId); + await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); await listFromStatusCountGreaterOrEqualToTwo.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -233,16 +239,16 @@ public async Task OverrideTheBackOffWithBackOffDisabled() [Fact] public async Task RecoveryTopologyShouldRecoverTheTempQueues() { + string localContainerId = $"{_containerId}_temp-queue-should-recover-connection-name"; string queueName = $"temp-queue-should-recover-{true}"; - const string containerId = "temp-queue-should-recover-connection-name"; var connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); - TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); + TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; connection.ChangeState += (sender, from, to, error) => { @@ -256,7 +262,7 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues() await management.Queue().Name(queueName).AutoDelete(true).Exclusive(true).DeclareAsync(); Assert.Equal(1, topologyListener.QueueCount()); - await SystemUtils.WaitUntilConnectionIsKilled(containerId); + await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); await SystemUtils.WaitUntilFuncAsync(() => recoveryEvents == 2); @@ -280,15 +286,15 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues() public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() { string queueName = $"temp-queue-should-recover-{false}"; - const string containerId = "temp-queue-should-not-recover-connection-name"; + string localContainerId = $"{_containerId}_temp-queue-should-not-recover-connection-name"; var connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(false)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); - TaskCompletionSource oneRecoveryEventSeenTcs = CreateTaskCompletionSource(); + TaskCompletionSource oneRecoveryEventSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; connection.ChangeState += (sender, from, to, error) => { @@ -302,7 +308,7 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() await management.Queue().Name(queueName).AutoDelete(true).Exclusive(true).DeclareAsync(); Assert.Equal(1, topologyListener.QueueCount()); - await SystemUtils.WaitUntilConnectionIsKilled(containerId); + await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); await oneRecoveryEventSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); await SystemUtils.WaitUntilQueueDeletedAsync(queueName); @@ -317,15 +323,15 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled) { const string exchangeName = "exchange-should-recover"; - const string containerId = nameof(RecoveryTopologyShouldRecoverExchanges); + string localContainerId = $"{_containerId}_exchange-should-recover-connection-name"; IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(topologyEnabled)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); - TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); + TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; connection.ChangeState += (sender, from, to, error) => { @@ -345,7 +351,7 @@ public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled) // the exchange is recovered. await SystemUtils.DeleteExchangeAsync("exchange-should-recover"); - await SystemUtils.WaitUntilConnectionIsKilled(containerId); + await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); @@ -373,15 +379,15 @@ public async Task RecoveryTopologyShouldRecoverExchanges(bool topologyEnabled) [InlineData(false)] public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled) { - const string containerId = "binding-should-recover-connection-name"; + string localContainerId = $"{_containerId}_binding-should-recover-connection-name"; var connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(topologyEnabled)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); - TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); + TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource(); int recoveryEvents = 0; connection.ChangeState += (sender, from, to, error) => { @@ -409,7 +415,7 @@ public async Task RecoveryTopologyShouldRecoverBindings(bool topologyEnabled) await SystemUtils.DeleteExchangeAsync("exchange-should-recover-binding"); // The queue will be deleted due of the auto-delete flag - await SystemUtils.WaitUntilConnectionIsKilled(containerId); + await SystemUtils.WaitUntilConnectionIsKilled(localContainerId); await twoRecoveryEventsSeenTcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); if (topologyEnabled) @@ -452,13 +458,14 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("exchan [Fact] public async Task RemoveAQueueShouldRemoveTheBindings() { - const string containerId = nameof(RemoveAQueueShouldRemoveTheBindings); + string localContainerId = $"{_containerId}_remove-queue-should-remove-binding-connection-name"; + IConnection connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); IManagement management = connection.Management(); @@ -512,13 +519,13 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("e-remo [Fact] public async Task RemoveAnExchangeShouldRemoveTheBindings() { - const string containerId = "remove-exchange-should-remove-binding-connection-name"; + string localContainerId = $"{_containerId}_remove-exchange-should-remove-binding-connection-name"; var connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); IManagement management = connection.Management(); @@ -577,13 +584,13 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync( [Fact] public async Task RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings() { - const string containerId = nameof(RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings); + string localContainerId = $"{_containerId}_remove-exchange-bound-to-another-exchange-should-remove-binding-connection-name"; var connection = await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create() .RecoveryConfiguration(RecoveryConfiguration.Create() .BackOffDelayPolicy(new FakeFastBackOffDelay()) .Topology(true)) - .ContainerId(containerId) + .ContainerId(localContainerId) .Build()); IManagement management = connection.Management(); @@ -594,7 +601,8 @@ public async Task RemoveAnExchangeBoundToAnotherExchangeShouldRemoveTheBindings( await exSpec.DeclareAsync(); - var exSpecDestination = management.Exchange().Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination") + var exSpecDestination = management.Exchange() + .Name("e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination") .Type(ExchangeType.DIRECT); await exSpecDestination.DeclareAsync(); @@ -605,7 +613,8 @@ await management.Binding().SourceExchange(exSpec) .DestinationExchange(exSpecDestination).Key($"key_{i}").BindAsync(); } - await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync("e-remove-exchange-bound-to-another-exchange-should-remove-binding", + await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync( + "e-remove-exchange-bound-to-another-exchange-should-remove-binding", "e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination"); Assert.Equal(10, topologyListener.BindingCount()); @@ -615,7 +624,8 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync("e-remov await exSpec.DeleteAsync(); - await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync("e-remove-exchange-bound-to-another-exchange-should-remove-binding", + await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync( + "e-remove-exchange-bound-to-another-exchange-should-remove-binding", "e-remove-exchange-bound-to-another-exchange-should-remove-binding-destination"); Assert.Equal(0, topologyListener.ExchangeCount()); diff --git a/Tests/SystemUtils.cs b/Tests/SystemUtils.cs index 41927b79..54d94a0f 100644 --- a/Tests/SystemUtils.cs +++ b/Tests/SystemUtils.cs @@ -95,7 +95,9 @@ public static Task WaitUntilConnectionIsClosed(string containerId) public static async Task WaitUntilConnectionIsKilled(string containerId) { await WaitUntilConnectionIsOpen(containerId); - await WaitUntilAsync(async () => await s_httpApiClient.KillConnectionAsync(containerId) == 1); + await WaitUntilAsync(async () => + + await s_httpApiClient.KillConnectionAsync(containerId) == 1); } public static async Task WaitUntilConnectionIsKilledAndOpen(string containerId)