diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 54dcf2df0..3d4eff039 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -485,9 +485,11 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args) AssertShutdownError(args, Constants.PreconditionFailed); } - protected static Task AssertRanToCompletion(IEnumerable tasks) + protected async Task AssertRanToCompletion(IEnumerable tasks) { - return DoAssertRanToCompletion(tasks); + Task whenAllTask = Task.WhenAll(tasks); + await whenAllTask.WaitAsync(LongWaitSpan); + Assert.True(whenAllTask.IsCompletedSuccessfully()); } internal static void AssertRecordedQueues(AutorecoveringConnection c, int n) @@ -598,13 +600,6 @@ private static int GetConnectionIdx() return Interlocked.Increment(ref _connectionIdx); } - private static async Task DoAssertRanToCompletion(IEnumerable tasks) - { - Task whenAllTask = Task.WhenAll(tasks); - await whenAllTask; - Assert.True(whenAllTask.IsCompletedSuccessfully()); - } - protected static string GetUniqueString(ushort length) { byte[] bytes = GetRandomBody(length); diff --git a/projects/Test/Integration/TestConcurrentAccessBase.cs.cs b/projects/Test/Integration/TestConcurrentAccessBase.cs.cs new file mode 100644 index 000000000..ea83d6b5e --- /dev/null +++ b/projects/Test/Integration/TestConcurrentAccessBase.cs.cs @@ -0,0 +1,69 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration +{ + public class TestConcurrentAccessBase : IntegrationFixture + { + protected const ushort _messageCount = 200; + + public TestConcurrentAccessBase(ITestOutputHelper output, + ushort consumerDispatchConcurrency = 1, + bool openChannel = true) : base(output, consumerDispatchConcurrency, openChannel) + { + } + + protected async Task TestConcurrentOperationsAsync(Func action, int iterations) + { + var tasks = new List(); + for (int i = 0; i < _processorCount; i++) + { + for (int j = 0; j < iterations; j++) + { + await Task.Delay(S_Random.Next(1, 10)); + tasks.Add(action()); + } + } + await AssertRanToCompletion(tasks); + + // incorrect frame interleaving in these tests will result + // in an unrecoverable connection-level exception, thus + // closing the connection + Assert.True(_conn.IsOpen); + } + } +} diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs new file mode 100644 index 000000000..726bf5979 --- /dev/null +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -0,0 +1,135 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration +{ + public class TestConcurrentAccessWithSharedChannel : TestConcurrentAccessBase + { + private const int Iterations = 10; + + public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output) + : base(output) + { + } + + [Fact] + public async Task ConcurrentPublishSingleChannel() + { + int publishAckCount = 0; + + _channel.BasicAcks += (object sender, BasicAckEventArgs e) => + { + Interlocked.Increment(ref publishAckCount); + }; + + _channel.BasicNacks += (object sender, BasicNackEventArgs e) => + { + _output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); + }; + + await _channel.ConfirmSelectAsync(trackConfirmations: false); + + await TestConcurrentOperationsAsync(async () => + { + long receivedCount = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var msgTracker = new ConcurrentDictionary(); + + QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, passive: false, + durable: false, exclusive: true, autoDelete: true, arguments: null); + + var consumer = new AsyncEventingBasicConsumer(_channel); + + consumer.Received += async (object sender, BasicDeliverEventArgs ea) => + { + try + { + System.Diagnostics.Debug.Assert(object.ReferenceEquals(sender, consumer)); + ushort idx = ushort.Parse(_encoding.GetString(ea.Body.ToArray())); + Assert.False(msgTracker[idx]); + msgTracker[idx] = true; + + var cons = (AsyncEventingBasicConsumer)sender; + IChannel ch = cons.Channel; + await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false); + + if (Interlocked.Increment(ref receivedCount) == _messageCount) + { + if (msgTracker.Values.Any(v => v == false)) + { + tcs.SetResult(false); + } + else + { + tcs.SetResult(true); + } + } + } + catch (Exception ex) + { + tcs.SetException(ex); + } + }; + + await _channel.BasicConsumeAsync(queue: q.QueueName, autoAck: false, consumer); + + var publishTasks = new List(); + for (ushort i = 0; i < _messageCount; i++) + { + msgTracker[i] = false; + byte[] body = _encoding.GetBytes(i.ToString()); + publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body)); + } + + foreach (ValueTask pt in publishTasks) + { + await pt; + } + + Assert.True(await tcs.Task); + }, Iterations); + + _output.WriteLine("@@@@@@@@ PUBLISH ACK COUNT: {0}", publishAckCount); + } + } +} diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index ff831517f..0d6492360 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -30,18 +30,19 @@ //--------------------------------------------------------------------------- using System; -using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using Xunit; using Xunit.Abstractions; namespace Test.Integration { - public class TestConcurrentAccessWithSharedConnection : IntegrationFixture + public class TestConcurrentAccessWithSharedConnection : TestConcurrentAccessBase { public TestConcurrentAccessWithSharedConnection(ITestOutputHelper output) - : base(output) + : base(output, openChannel: false) { } @@ -49,6 +50,7 @@ public override async Task InitializeAsync() { _connFactory = CreateConnectionFactory(); _conn = await _connFactory.CreateConnectionAsync(); + _conn.ConnectionShutdown += HandleConnectionShutdown; // NB: not creating _channel because this test suite doesn't use it. Assert.Null(_channel); } @@ -56,41 +58,103 @@ public override async Task InitializeAsync() [Fact] public async Task TestConcurrentChannelOpenCloseLoop() { - await TestConcurrentChannelOperationsAsync(async (conn) => + await TestConcurrentOperationsAsync(async () => { - using (IChannel ch = await conn.CreateChannelAsync()) + using (IChannel ch = await _conn.CreateChannelAsync()) { await ch.CloseAsync(); } }, 50); } - private async Task TestConcurrentChannelOperationsAsync(Func action, int iterations) + [Fact] + public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync() + { + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), 30); + } + + [Fact] + public Task TestConcurrentChannelOpenAndPublishingSize64Async() { - var tasks = new List(); - for (int i = 0; i < _processorCount; i++) + return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64); + } + + [Fact] + public Task TestConcurrentChannelOpenAndPublishingSize256Async() + { + return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256); + } + + [Fact] + public Task TestConcurrentChannelOpenAndPublishingSize1024Async() + { + return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024); + } + + private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30) + { + byte[] body = GetRandomBody(length); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations); + } + + private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations) + { + return TestConcurrentOperationsAsync(async () => { - tasks.Add(Task.Run(() => + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tokenSource = new CancellationTokenSource(LongWaitSpan); + CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { - var subTasks = new List(); - for (int j = 0; j < iterations; j++) + tcs.TrySetResult(false); + }); + + try + { + using (IChannel ch = await _conn.CreateChannelAsync()) { - subTasks.Add(action(_conn)); + ch.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(ch, ea, (args) => + { + if (args.Initiator != ShutdownInitiator.Application) + { + tcs.TrySetException(args.Exception); + } + }); + }; + + await ch.ConfirmSelectAsync(trackConfirmations: false); + + ch.BasicAcks += (object sender, BasicAckEventArgs e) => + { + if (e.DeliveryTag >= _messageCount) + { + tcs.SetResult(true); + } + }; + + ch.BasicNacks += (object sender, BasicNackEventArgs e) => + { + tcs.SetResult(false); + _output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); + }; + + QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); + for (ushort j = 0; j < _messageCount; j++) + { + await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body); + } + + Assert.True(await tcs.Task); + await ch.CloseAsync(); } - return Task.WhenAll(subTasks); - })); - } - - Task whenTask = Task.WhenAll(tasks); - await whenTask.WaitAsync(LongWaitSpan); - Assert.True(whenTask.IsCompleted); - Assert.False(whenTask.IsCanceled); - Assert.False(whenTask.IsFaulted); - - // incorrect frame interleaving in these tests will result - // in an unrecoverable connection-level exception, thus - // closing the connection - Assert.True(_conn.IsOpen); + } + finally + { + tokenSource.Dispose(); + ctsr.Dispose(); + } + }, iterations); } } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs deleted file mode 100644 index 37e3450da..000000000 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ /dev/null @@ -1,171 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Xunit; -using Xunit.Abstractions; - -namespace Test.Integration -{ - public class TestConcurrentAccessWithSharedConnectionAsync : IntegrationFixture - { - private const ushort _messageCount = 200; - - public TestConcurrentAccessWithSharedConnectionAsync(ITestOutputHelper output) - : base(output, openChannel: false) - { - } - - public override async Task InitializeAsync() - { - await base.InitializeAsync(); - _conn.ConnectionShutdown += HandleConnectionShutdown; - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync() - { - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), 30); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize64Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize256Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize1024Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024); - } - - private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30) - { - byte[] body = GetRandomBody(length); - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations); - } - - private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations) - { - return TestConcurrentChannelOperationsAsync(async (conn) => - { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var tokenSource = new CancellationTokenSource(LongWaitSpan); - CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => - { - tcs.TrySetResult(false); - }); - - try - { - using (IChannel ch = await _conn.CreateChannelAsync()) - { - ch.ChannelShutdown += (o, ea) => - { - HandleChannelShutdown(ch, ea, (args) => - { - if (args.Initiator != ShutdownInitiator.Application) - { - tcs.TrySetException(args.Exception); - } - }); - }; - - await ch.ConfirmSelectAsync(); - - ch.BasicAcks += (object sender, BasicAckEventArgs e) => - { - if (e.DeliveryTag >= _messageCount) - { - tcs.SetResult(true); - } - }; - - ch.BasicNacks += (object sender, BasicNackEventArgs e) => - { - tcs.SetResult(false); - _output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); - }; - - QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); - for (ushort j = 0; j < _messageCount; j++) - { - await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body); - } - - Assert.True(await tcs.Task); - await ch.CloseAsync(); - } - } - finally - { - tokenSource.Dispose(); - ctsr.Dispose(); - } - }, iterations); - } - - private Task TestConcurrentChannelOperationsAsync(Func actions, int iterations) - { - return TestConcurrentChannelOperationsAsync(actions, iterations, LongWaitSpan); - } - - private async Task TestConcurrentChannelOperationsAsync(Func action, int iterations, TimeSpan timeout) - { - var tasks = new List(); - for (int i = 0; i < _processorCount; i++) - { - for (int j = 0; j < iterations; j++) - { - tasks.Add(action(_conn)); - } - } - await AssertRanToCompletion(tasks); - - // incorrect frame interleaving in these tests will result - // in an unrecoverable connection-level exception, thus - // closing the connection - Assert.True(_conn.IsOpen); - } - } -}