From a583dd156f86ec1e6decc900e023e4cb77bbbdeb Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 5 Sep 2024 14:00:40 -0700 Subject: [PATCH] Fix `NextPublishSeqNo` when retrieved concurrently Discovered while updating `rabbitmq/rabbitmq-tutorials` to version `7.0.0-rc.8` of this library. * Add `trackConfirmations` argument to `ConfirmSelectAsync` to allow disabling internal confirm tracking. * Increase CI timeouts since GHA Windows runners are slow (actions/runner-images#7320) --- .github/workflows/build-test.yaml | 4 +- Build.csproj | 1 + RabbitMQDotNetClient.sln | 9 +- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 2 +- .../RabbitMQ.Client/client/api/IChannel.cs | 8 +- .../client/impl/AutorecoveringChannel.cs | 8 +- .../client/impl/ChannelBase.cs | 227 +++++++++++++----- .../PublisherConfirms/PublisherConfirms.cs | 212 ++++++++++++++++ .../PublisherConfirms.csproj | 19 ++ .../Test/Integration/TestPublisherConfirms.cs | 9 +- 10 files changed, 418 insertions(+), 81 deletions(-) create mode 100644 projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs create mode 100644 projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 2919461349..07d13d90ef 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -67,7 +67,7 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Integration Tests - timeout-minutes: 15 + timeout-minutes: 25 run: | $tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; ` Start-Sleep -Seconds 1; ` @@ -115,7 +115,7 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Sequential Integration Tests - timeout-minutes: 15 + timeout-minutes: 25 run: dotnet test ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` diff --git a/Build.csproj b/Build.csproj index a919ea9644..44029a2287 100644 --- a/Build.csproj +++ b/Build.csproj @@ -14,6 +14,7 @@ + diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 5354b8d079..1f01baa7ef 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -42,7 +42,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "project EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Test\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -102,6 +104,10 @@ Global {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU + {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU + {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU + {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -117,6 +123,7 @@ Global {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} + {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index b8e4f69b98..656279bb01 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -813,7 +813,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask ~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.CloseAsync(ushort replyCode, string replyText, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IChannel.ConfirmSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.ConsumerCountAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.ExchangeBindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary arguments = null, bool passive = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task @@ -897,3 +896,4 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel RabbitMQ.Client.ICredentialsProvider.Name.get -> string! RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider! +RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 39415f6abc..94a21c0a90 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -265,9 +265,13 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort, Task CloseAsync(ShutdownEventArgs reason, bool abort, CancellationToken cancellationToken = default); - /// Asynchronously enable publisher confirmations. + /// + /// Asynchronously enable publisher confirmations. + /// + /// Set to false if tracking via and yourself. /// CancellationToken for this operation. - Task ConfirmSelectAsync(CancellationToken cancellationToken = default); + Task ConfirmSelectAsync(bool trackConfirmations = true, + CancellationToken cancellationToken = default); /// Asynchronously declare an exchange. /// The name of the exchange. diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 3640d461e9..a9ab40f0c6 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -50,6 +50,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; private bool _usesPublisherConfirms; + private bool _tracksPublisherConfirmations; private bool _usesTransactions; internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher; @@ -177,7 +178,7 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken) if (_usesPublisherConfirms) { - await newChannel.ConfirmSelectAsync(cancellationToken) + await newChannel.ConfirmSelectAsync(_tracksPublisherConfirmations, cancellationToken) .ConfigureAwait(false); } @@ -334,11 +335,12 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken); } - public async Task ConfirmSelectAsync(CancellationToken cancellationToken) + public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default) { - await InnerChannel.ConfirmSelectAsync(cancellationToken) + await InnerChannel.ConfirmSelectAsync(trackConfirmations, cancellationToken) .ConfigureAwait(false); _usesPublisherConfirms = true; + _tracksPublisherConfirmations = trackConfirmations; } public async Task ExchangeBindAsync(string destination, string source, string routingKey, diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 261fe2b3c9..d860fcd741 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -60,8 +60,11 @@ internal abstract class ChannelBase : IChannel, IRecoverable private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); + private ulong _nextPublishSeqNo; private SemaphoreSlim? _confirmSemaphore; - private readonly LinkedList _pendingDeliveryTags = new LinkedList(); + private bool _trackConfirmations; + private LinkedList? _pendingDeliveryTags; + private List>? _confirmsTaskCompletionSources; private bool _onlyAcksReceived = true; @@ -176,7 +179,28 @@ public IAsyncBasicConsumer? DefaultConsumer [MemberNotNullWhen(false, nameof(CloseReason))] public bool IsOpen => CloseReason is null; - public ulong NextPublishSeqNo { get; private set; } + public ulong NextPublishSeqNo + { + get + { + if (ConfirmsAreEnabled) + { + _confirmSemaphore.Wait(); + try + { + return _nextPublishSeqNo; + } + finally + { + _confirmSemaphore.Release(); + } + } + else + { + return _nextPublishSeqNo; + } + } + } public string? CurrentQueue { get; private set; } @@ -588,57 +612,6 @@ protected void HandleBasicNack(IncomingCommand cmd) HandleAckNack(nack._deliveryTag, nack._multiple, true); } - protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) - { - // No need to do this if publisher confirms have never been enabled. - if (ConfirmsAreEnabled) - { - // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted - _confirmSemaphore.Wait(); - try - { - // No need to do anything if there are no delivery tags in the list - if (_pendingDeliveryTags.Count > 0) - { - if (multiple) - { - while (_pendingDeliveryTags.First!.Value < deliveryTag) - { - _pendingDeliveryTags.RemoveFirst(); - } - - if (_pendingDeliveryTags.First.Value == deliveryTag) - { - _pendingDeliveryTags.RemoveFirst(); - } - } - else - { - _pendingDeliveryTags.Remove(deliveryTag); - } - } - - _onlyAcksReceived = _onlyAcksReceived && !isNack; - - if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0) - { - // Done, mark tasks - foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources) - { - confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived); - } - - _confirmsTaskCompletionSources.Clear(); - _onlyAcksReceived = true; - } - } - finally - { - _confirmSemaphore.Release(); - } - } - } - protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) { string consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag; @@ -962,7 +935,16 @@ await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - _pendingDeliveryTags.AddLast(NextPublishSeqNo++); + if (_trackConfirmations) + { + if (_pendingDeliveryTags is null) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + _pendingDeliveryTags.AddLast(_nextPublishSeqNo); + } + + _nextPublishSeqNo++; } finally { @@ -1005,8 +987,11 @@ await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - NextPublishSeqNo--; - _pendingDeliveryTags.RemoveLast(); + _nextPublishSeqNo--; + if (_trackConfirmations && _pendingDeliveryTags is not null) + { + _pendingDeliveryTags.RemoveLast(); + } } finally { @@ -1029,7 +1014,16 @@ await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - _pendingDeliveryTags.AddLast(NextPublishSeqNo++); + if (_trackConfirmations) + { + if (_pendingDeliveryTags is null) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + _pendingDeliveryTags.AddLast(_nextPublishSeqNo); + } + + _nextPublishSeqNo++; } finally { @@ -1072,8 +1066,11 @@ await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - NextPublishSeqNo--; - _pendingDeliveryTags.RemoveLast(); + _nextPublishSeqNo--; + if (_trackConfirmations && _pendingDeliveryTags is not null) + { + _pendingDeliveryTags.RemoveLast(); + } } finally { @@ -1157,8 +1154,9 @@ await ModelSendAsync(method, k.CancellationToken) } } - public async Task ConfirmSelectAsync(CancellationToken cancellationToken) + public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default) { + _trackConfirmations = trackConfirmations; bool enqueued = false; var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -1166,10 +1164,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - if (NextPublishSeqNo == 0UL) + if (_nextPublishSeqNo == 0UL) { - _confirmsTaskCompletionSources = new List>(); - NextPublishSeqNo = 1; + if (_trackConfirmations) + { + _pendingDeliveryTags = new LinkedList(); + _confirmsTaskCompletionSources = new List>(); + } + _nextPublishSeqNo = 1; } enqueued = Enqueue(k); @@ -1681,8 +1683,6 @@ await ModelSendAsync(method, k.CancellationToken) } } - private List>? _confirmsTaskCompletionSources; - public async Task WaitForConfirmsAsync(CancellationToken cancellationToken = default) { if (false == ConfirmsAreEnabled) @@ -1690,6 +1690,16 @@ public async Task WaitForConfirmsAsync(CancellationToken cancellationToken throw new InvalidOperationException("Confirms not selected"); } + if (false == _trackConfirmations) + { + throw new InvalidOperationException("Confirmation tracking is not enabled"); + } + + if (_pendingDeliveryTags is null) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + TaskCompletionSource tcs; await _confirmSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); @@ -1731,6 +1741,21 @@ await _confirmSemaphore.WaitAsync(cancellationToken) public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default) { + if (false == ConfirmsAreEnabled) + { + throw new InvalidOperationException("Confirms not selected"); + } + + if (false == _trackConfirmations) + { + throw new InvalidOperationException("Confirmation tracking is not enabled"); + } + + if (_pendingDeliveryTags is null) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + try { bool onlyAcksReceived = await WaitForConfirmsAsync(token) @@ -1761,6 +1786,21 @@ await CloseAsync(ea, false, token) private async Task WaitForConfirmsWithTokenAsync(TaskCompletionSource tcs, CancellationToken cancellationToken) { + if (false == ConfirmsAreEnabled) + { + throw new InvalidOperationException("Confirms not selected"); + } + + if (false == _trackConfirmations) + { + throw new InvalidOperationException("Confirmation tracking is not enabled"); + } + + if (_pendingDeliveryTags is null) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + CancellationTokenRegistration tokenRegistration = #if NET6_0_OR_GREATER cancellationToken.UnsafeRegister( @@ -1785,6 +1825,63 @@ await tokenRegistration.DisposeAsync() } } + // NOTE: this method is internal for its use in this test: + // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse + internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) + { + // Only do this if confirms are enabled *and* the library is tracking confirmations + if (ConfirmsAreEnabled && _trackConfirmations) + { + if (_pendingDeliveryTags is null) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted + _confirmSemaphore.Wait(); + try + { + // No need to do anything if there are no delivery tags in the list + if (_pendingDeliveryTags.Count > 0) + { + if (multiple) + { + while (_pendingDeliveryTags.First!.Value < deliveryTag) + { + _pendingDeliveryTags.RemoveFirst(); + } + + if (_pendingDeliveryTags.First.Value == deliveryTag) + { + _pendingDeliveryTags.RemoveFirst(); + } + } + else + { + _pendingDeliveryTags.Remove(deliveryTag); + } + } + + _onlyAcksReceived = _onlyAcksReceived && !isNack; + + if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0) + { + // Done, mark tasks + foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources) + { + confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived); + } + + _confirmsTaskCompletionSources.Clear(); + _onlyAcksReceived = true; + } + } + finally + { + _confirmSemaphore.Release(); + } + } + } + private static BasicProperties? PopulateActivityAndPropagateTraceId(TProperties basicProperties, Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs new file mode 100644 index 0000000000..3708452d0f --- /dev/null +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -0,0 +1,212 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; + +const int MESSAGE_COUNT = 50_000; +bool debug = false; + +await PublishMessagesIndividuallyAsync(); +await PublishMessagesInBatchAsync(); +await HandlePublishConfirmsAsynchronously(); + +static Task CreateConnectionAsync() +{ + var factory = new ConnectionFactory { HostName = "localhost" }; + return factory.CreateConnectionAsync(); +} + +static async Task PublishMessagesIndividuallyAsync() +{ + Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once"); + + using IConnection connection = await CreateConnectionAsync(); + using IChannel channel = await connection.CreateChannelAsync(); + + // declare a server-named queue + QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); + string queueName = queueDeclareResult.QueueName; + await channel.ConfirmSelectAsync(); + + var sw = new Stopwatch(); + sw.Start(); + + for (int i = 0; i < MESSAGE_COUNT; i++) + { + byte[] body = Encoding.UTF8.GetBytes(i.ToString()); + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body); + } + + await channel.WaitForConfirmsOrDieAsync(); + + sw.Stop(); + + Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms"); +} + +static async Task PublishMessagesInBatchAsync() +{ + Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches"); + + using IConnection connection = await CreateConnectionAsync(); + using IChannel channel = await connection.CreateChannelAsync(); + + // declare a server-named queue + QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); + string queueName = queueDeclareResult.QueueName; + await channel.ConfirmSelectAsync(); + + int batchSize = 100; + int outstandingMessageCount = 0; + + var sw = new Stopwatch(); + sw.Start(); + + var publishTasks = new List(); + for (int i = 0; i < MESSAGE_COUNT; i++) + { + byte[] body = Encoding.UTF8.GetBytes(i.ToString()); + publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask()); + outstandingMessageCount++; + + if (outstandingMessageCount == batchSize) + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await Task.WhenAll(publishTasks).WaitAsync(cts.Token); + publishTasks.Clear(); + + await channel.WaitForConfirmsOrDieAsync(cts.Token); + outstandingMessageCount = 0; + } + } + + if (outstandingMessageCount > 0) + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await channel.WaitForConfirmsOrDieAsync(cts.Token); + } + + sw.Stop(); + Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); +} + +async Task HandlePublishConfirmsAsynchronously() +{ + Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously"); + + using IConnection connection = await CreateConnectionAsync(); + using IChannel channel = await connection.CreateChannelAsync(); + + // declare a server-named queue + QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); + string queueName = queueDeclareResult.QueueName; + + // NOTE: setting trackConfirmations to false because this program + // is tracking them itself. + await channel.ConfirmSelectAsync(trackConfirmations: false); + + bool publishingCompleted = false; + var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var outstandingConfirms = new LinkedList(); + var semaphore = new SemaphoreSlim(1, 1); + void CleanOutstandingConfirms(ulong deliveryTag, bool multiple) + { + if (debug) + { + Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})", + DateTime.Now, deliveryTag, multiple); + } + + semaphore.Wait(); + try + { + if (multiple) + { + do + { + LinkedListNode? node = outstandingConfirms.First; + if (node is null) + { + break; + } + if (node.Value <= deliveryTag) + { + outstandingConfirms.RemoveFirst(); + } + else + { + break; + } + } while (true); + } + else + { + outstandingConfirms.Remove(deliveryTag); + } + } + finally + { + semaphore.Release(); + } + + if (publishingCompleted && outstandingConfirms.Count == 0) + { + allMessagesConfirmedTcs.SetResult(true); + } + } + + channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); + channel.BasicNacks += (sender, ea) => + { + Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})"); + CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); + }; + + var sw = new Stopwatch(); + sw.Start(); + + var publishTasks = new List(); + for (int i = 0; i < MESSAGE_COUNT; i++) + { + string msg = i.ToString(); + byte[] body = Encoding.UTF8.GetBytes(msg); + ulong nextPublishSeqNo = channel.NextPublishSeqNo; + if ((ulong)(i + 1) != nextPublishSeqNo) + { + Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}"); + } + await semaphore.WaitAsync(); + try + { + outstandingConfirms.AddLast(nextPublishSeqNo); + } + finally + { + semaphore.Release(); + } + publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask()); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await Task.WhenAll(publishTasks).WaitAsync(cts.Token); + publishingCompleted = true; + + try + { + await allMessagesConfirmedTcs.Task.WaitAsync(cts.Token); + } + catch (OperationCanceledException) + { + Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now); + } + catch (TimeoutException) + { + Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now); + } + + sw.Stop(); + Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {sw.ElapsedMilliseconds:N0} ms"); +} diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj new file mode 100644 index 0000000000..fd865eccf4 --- /dev/null +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj @@ -0,0 +1,19 @@ + + + + net6.0 + $(NoWarn);CA2007 + true + + + + Exe + enable + 9.0 + + + + + + + diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 64714b7693..42c612a63a 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -30,7 +30,6 @@ //--------------------------------------------------------------------------- using System; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; @@ -105,12 +104,8 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout { return TestWaitForConfirmsAsync(2000, async (ch) => { - IChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel; - actualChannel - .GetType() - .GetMethod("HandleAckNack", BindingFlags.Instance | BindingFlags.NonPublic) - .Invoke(actualChannel, new object[] { 10UL, false, true }); - + RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel; + actualChannel.HandleAckNack(10UL, false, true); using (var cts = new CancellationTokenSource(ShortSpan)) { Assert.False(await ch.WaitForConfirmsAsync(cts.Token));