Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,11 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args)
AssertShutdownError(args, Constants.PreconditionFailed);
}

protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
protected async Task AssertRanToCompletion(IEnumerable<Task> tasks)
{
return DoAssertRanToCompletion(tasks);
Task whenAllTask = Task.WhenAll(tasks);
await whenAllTask.WaitAsync(LongWaitSpan);
Assert.True(whenAllTask.IsCompletedSuccessfully());
}

internal static void AssertRecordedQueues(AutorecoveringConnection c, int n)
Expand Down Expand Up @@ -598,13 +600,6 @@ private static int GetConnectionIdx()
return Interlocked.Increment(ref _connectionIdx);
}

private static async Task DoAssertRanToCompletion(IEnumerable<Task> tasks)
{
Task whenAllTask = Task.WhenAll(tasks);
await whenAllTask;
Assert.True(whenAllTask.IsCompletedSuccessfully());
}

protected static string GetUniqueString(ushort length)
{
byte[] bytes = GetRandomBody(length);
Expand Down
69 changes: 69 additions & 0 deletions projects/Test/Integration/TestConcurrentAccessBase.cs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestConcurrentAccessBase : IntegrationFixture
{
protected const ushort _messageCount = 200;

public TestConcurrentAccessBase(ITestOutputHelper output,
ushort consumerDispatchConcurrency = 1,
bool openChannel = true) : base(output, consumerDispatchConcurrency, openChannel)
{
}

protected async Task TestConcurrentOperationsAsync(Func<Task> action, int iterations)
{
var tasks = new List<Task>();
for (int i = 0; i < _processorCount; i++)
{
for (int j = 0; j < iterations; j++)
{
await Task.Delay(S_Random.Next(1, 10));
tasks.Add(action());
}
}
await AssertRanToCompletion(tasks);

// incorrect frame interleaving in these tests will result
// in an unrecoverable connection-level exception, thus
// closing the connection
Assert.True(_conn.IsOpen);
}
}
}
135 changes: 135 additions & 0 deletions projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestConcurrentAccessWithSharedChannel : TestConcurrentAccessBase
{
private const int Iterations = 10;

public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output)
: base(output)
{
}

[Fact]
public async Task ConcurrentPublishSingleChannel()
{
int publishAckCount = 0;

_channel.BasicAcks += (object sender, BasicAckEventArgs e) =>
{
Interlocked.Increment(ref publishAckCount);
};

_channel.BasicNacks += (object sender, BasicNackEventArgs e) =>
{
_output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
};

await _channel.ConfirmSelectAsync(trackConfirmations: false);

await TestConcurrentOperationsAsync(async () =>
{
long receivedCount = 0;
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var msgTracker = new ConcurrentDictionary<ushort, bool>();

QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, passive: false,
durable: false, exclusive: true, autoDelete: true, arguments: null);

var consumer = new AsyncEventingBasicConsumer(_channel);

consumer.Received += async (object sender, BasicDeliverEventArgs ea) =>
{
try
{
System.Diagnostics.Debug.Assert(object.ReferenceEquals(sender, consumer));
ushort idx = ushort.Parse(_encoding.GetString(ea.Body.ToArray()));
Assert.False(msgTracker[idx]);
msgTracker[idx] = true;

var cons = (AsyncEventingBasicConsumer)sender;
IChannel ch = cons.Channel;
await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);

if (Interlocked.Increment(ref receivedCount) == _messageCount)
{
if (msgTracker.Values.Any(v => v == false))
{
tcs.SetResult(false);
}
else
{
tcs.SetResult(true);
}
}
}
catch (Exception ex)
{
tcs.SetException(ex);
}
};

await _channel.BasicConsumeAsync(queue: q.QueueName, autoAck: false, consumer);

var publishTasks = new List<ValueTask>();
for (ushort i = 0; i < _messageCount; i++)
{
msgTracker[i] = false;
byte[] body = _encoding.GetBytes(i.ToString());
publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body));
}

foreach (ValueTask pt in publishTasks)
{
await pt;
}

Assert.True(await tcs.Task);
}, Iterations);

_output.WriteLine("@@@@@@@@ PUBLISH ACK COUNT: {0}", publishAckCount);
}
}
}
116 changes: 90 additions & 26 deletions projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,67 +30,131 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture
public class TestConcurrentAccessWithSharedConnection : TestConcurrentAccessBase
{
public TestConcurrentAccessWithSharedConnection(ITestOutputHelper output)
: base(output)
: base(output, openChannel: false)
{
}

public override async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = await _connFactory.CreateConnectionAsync();
_conn.ConnectionShutdown += HandleConnectionShutdown;
// NB: not creating _channel because this test suite doesn't use it.
Assert.Null(_channel);
}

[Fact]
public async Task TestConcurrentChannelOpenCloseLoop()
{
await TestConcurrentChannelOperationsAsync(async (conn) =>
await TestConcurrentOperationsAsync(async () =>
{
using (IChannel ch = await conn.CreateChannelAsync())
using (IChannel ch = await _conn.CreateChannelAsync())
{
await ch.CloseAsync();
}
}, 50);
}

private async Task TestConcurrentChannelOperationsAsync(Func<IConnection, Task> action, int iterations)
[Fact]
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync()
{
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty<byte>(), 30);
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingSize64Async()
{
var tasks = new List<Task>();
for (int i = 0; i < _processorCount; i++)
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64);
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingSize256Async()
{
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256);
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingSize1024Async()
{
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024);
}

private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30)
{
byte[] body = GetRandomBody(length);
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations);
}

private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations)
{
return TestConcurrentOperationsAsync(async () =>
{
tasks.Add(Task.Run(() =>
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var tokenSource = new CancellationTokenSource(LongWaitSpan);
CancellationTokenRegistration ctsr = tokenSource.Token.Register(() =>
{
var subTasks = new List<Task>();
for (int j = 0; j < iterations; j++)
tcs.TrySetResult(false);
});

try
{
using (IChannel ch = await _conn.CreateChannelAsync())
{
subTasks.Add(action(_conn));
ch.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(ch, ea, (args) =>
{
if (args.Initiator != ShutdownInitiator.Application)
{
tcs.TrySetException(args.Exception);
}
});
};

await ch.ConfirmSelectAsync(trackConfirmations: false);

ch.BasicAcks += (object sender, BasicAckEventArgs e) =>
{
if (e.DeliveryTag >= _messageCount)
{
tcs.SetResult(true);
}
};

ch.BasicNacks += (object sender, BasicNackEventArgs e) =>
{
tcs.SetResult(false);
_output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
};

QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null);
for (ushort j = 0; j < _messageCount; j++)
{
await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body);
}

Assert.True(await tcs.Task);
await ch.CloseAsync();
}
return Task.WhenAll(subTasks);
}));
}

Task whenTask = Task.WhenAll(tasks);
await whenTask.WaitAsync(LongWaitSpan);
Assert.True(whenTask.IsCompleted);
Assert.False(whenTask.IsCanceled);
Assert.False(whenTask.IsFaulted);

// incorrect frame interleaving in these tests will result
// in an unrecoverable connection-level exception, thus
// closing the connection
Assert.True(_conn.IsOpen);
}
finally
{
tokenSource.Dispose();
ctsr.Dispose();
}
}, iterations);
}
}
}
Loading