diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml
index 291946134..07d13d90e 100644
--- a/.github/workflows/build-test.yaml
+++ b/.github/workflows/build-test.yaml
@@ -67,7 +67,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Integration Tests
- timeout-minutes: 15
+ timeout-minutes: 25
run: |
$tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
Start-Sleep -Seconds 1; `
@@ -115,7 +115,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Sequential Integration Tests
- timeout-minutes: 15
+ timeout-minutes: 25
run: dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
diff --git a/Build.csproj b/Build.csproj
index a919ea964..44029a228 100644
--- a/Build.csproj
+++ b/Build.csproj
@@ -14,6 +14,7 @@
+
diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln
index 5354b8d07..1f01baa7e 100644
--- a/RabbitMQDotNetClient.sln
+++ b/RabbitMQDotNetClient.sln
@@ -42,7 +42,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "project
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Test\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -102,6 +104,10 @@ Global
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU
+ {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -117,6 +123,7 @@ Global
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
+ {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
index b8e4f69b9..656279bb0 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
@@ -813,7 +813,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.CloseAsync(ushort replyCode, string replyText, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
-~RabbitMQ.Client.IChannel.ConfirmSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.ConsumerCountAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.ExchangeBindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary arguments = null, bool passive = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
@@ -897,3 +896,4 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel
RabbitMQ.Client.ICredentialsProvider.Name.get -> string!
RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider!
+RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs
index 39415f6ab..94a21c0a9 100644
--- a/projects/RabbitMQ.Client/client/api/IChannel.cs
+++ b/projects/RabbitMQ.Client/client/api/IChannel.cs
@@ -265,9 +265,13 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);
- /// Asynchronously enable publisher confirmations.
+ ///
+ /// Asynchronously enable publisher confirmations.
+ ///
+ /// Set to false if tracking via and yourself.
/// CancellationToken for this operation.
- Task ConfirmSelectAsync(CancellationToken cancellationToken = default);
+ Task ConfirmSelectAsync(bool trackConfirmations = true,
+ CancellationToken cancellationToken = default);
/// Asynchronously declare an exchange.
/// The name of the exchange.
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
index 3640d461e..a9ab40f0c 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
@@ -50,6 +50,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
private bool _usesPublisherConfirms;
+ private bool _tracksPublisherConfirmations;
private bool _usesTransactions;
internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;
@@ -177,7 +178,7 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken)
if (_usesPublisherConfirms)
{
- await newChannel.ConfirmSelectAsync(cancellationToken)
+ await newChannel.ConfirmSelectAsync(_tracksPublisherConfirmations, cancellationToken)
.ConfigureAwait(false);
}
@@ -334,11 +335,12 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
}
- public async Task ConfirmSelectAsync(CancellationToken cancellationToken)
+ public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default)
{
- await InnerChannel.ConfirmSelectAsync(cancellationToken)
+ await InnerChannel.ConfirmSelectAsync(trackConfirmations, cancellationToken)
.ConfigureAwait(false);
_usesPublisherConfirms = true;
+ _tracksPublisherConfirmations = trackConfirmations;
}
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
index 261fe2b3c..d860fcd74 100644
--- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
@@ -60,8 +60,11 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
+ private ulong _nextPublishSeqNo;
private SemaphoreSlim? _confirmSemaphore;
- private readonly LinkedList _pendingDeliveryTags = new LinkedList();
+ private bool _trackConfirmations;
+ private LinkedList? _pendingDeliveryTags;
+ private List>? _confirmsTaskCompletionSources;
private bool _onlyAcksReceived = true;
@@ -176,7 +179,28 @@ public IAsyncBasicConsumer? DefaultConsumer
[MemberNotNullWhen(false, nameof(CloseReason))]
public bool IsOpen => CloseReason is null;
- public ulong NextPublishSeqNo { get; private set; }
+ public ulong NextPublishSeqNo
+ {
+ get
+ {
+ if (ConfirmsAreEnabled)
+ {
+ _confirmSemaphore.Wait();
+ try
+ {
+ return _nextPublishSeqNo;
+ }
+ finally
+ {
+ _confirmSemaphore.Release();
+ }
+ }
+ else
+ {
+ return _nextPublishSeqNo;
+ }
+ }
+ }
public string? CurrentQueue { get; private set; }
@@ -588,57 +612,6 @@ protected void HandleBasicNack(IncomingCommand cmd)
HandleAckNack(nack._deliveryTag, nack._multiple, true);
}
- protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
- {
- // No need to do this if publisher confirms have never been enabled.
- if (ConfirmsAreEnabled)
- {
- // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
- _confirmSemaphore.Wait();
- try
- {
- // No need to do anything if there are no delivery tags in the list
- if (_pendingDeliveryTags.Count > 0)
- {
- if (multiple)
- {
- while (_pendingDeliveryTags.First!.Value < deliveryTag)
- {
- _pendingDeliveryTags.RemoveFirst();
- }
-
- if (_pendingDeliveryTags.First.Value == deliveryTag)
- {
- _pendingDeliveryTags.RemoveFirst();
- }
- }
- else
- {
- _pendingDeliveryTags.Remove(deliveryTag);
- }
- }
-
- _onlyAcksReceived = _onlyAcksReceived && !isNack;
-
- if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0)
- {
- // Done, mark tasks
- foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
- {
- confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived);
- }
-
- _confirmsTaskCompletionSources.Clear();
- _onlyAcksReceived = true;
- }
- }
- finally
- {
- _confirmSemaphore.Release();
- }
- }
- }
-
protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
string consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag;
@@ -962,7 +935,16 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
- _pendingDeliveryTags.AddLast(NextPublishSeqNo++);
+ if (_trackConfirmations)
+ {
+ if (_pendingDeliveryTags is null)
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
+ _pendingDeliveryTags.AddLast(_nextPublishSeqNo);
+ }
+
+ _nextPublishSeqNo++;
}
finally
{
@@ -1005,8 +987,11 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
- NextPublishSeqNo--;
- _pendingDeliveryTags.RemoveLast();
+ _nextPublishSeqNo--;
+ if (_trackConfirmations && _pendingDeliveryTags is not null)
+ {
+ _pendingDeliveryTags.RemoveLast();
+ }
}
finally
{
@@ -1029,7 +1014,16 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
- _pendingDeliveryTags.AddLast(NextPublishSeqNo++);
+ if (_trackConfirmations)
+ {
+ if (_pendingDeliveryTags is null)
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
+ _pendingDeliveryTags.AddLast(_nextPublishSeqNo);
+ }
+
+ _nextPublishSeqNo++;
}
finally
{
@@ -1072,8 +1066,11 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
- NextPublishSeqNo--;
- _pendingDeliveryTags.RemoveLast();
+ _nextPublishSeqNo--;
+ if (_trackConfirmations && _pendingDeliveryTags is not null)
+ {
+ _pendingDeliveryTags.RemoveLast();
+ }
}
finally
{
@@ -1157,8 +1154,9 @@ await ModelSendAsync(method, k.CancellationToken)
}
}
- public async Task ConfirmSelectAsync(CancellationToken cancellationToken)
+ public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default)
{
+ _trackConfirmations = trackConfirmations;
bool enqueued = false;
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -1166,10 +1164,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
.ConfigureAwait(false);
try
{
- if (NextPublishSeqNo == 0UL)
+ if (_nextPublishSeqNo == 0UL)
{
- _confirmsTaskCompletionSources = new List>();
- NextPublishSeqNo = 1;
+ if (_trackConfirmations)
+ {
+ _pendingDeliveryTags = new LinkedList();
+ _confirmsTaskCompletionSources = new List>();
+ }
+ _nextPublishSeqNo = 1;
}
enqueued = Enqueue(k);
@@ -1681,8 +1683,6 @@ await ModelSendAsync(method, k.CancellationToken)
}
}
- private List>? _confirmsTaskCompletionSources;
-
public async Task WaitForConfirmsAsync(CancellationToken cancellationToken = default)
{
if (false == ConfirmsAreEnabled)
@@ -1690,6 +1690,16 @@ public async Task WaitForConfirmsAsync(CancellationToken cancellationToken
throw new InvalidOperationException("Confirms not selected");
}
+ if (false == _trackConfirmations)
+ {
+ throw new InvalidOperationException("Confirmation tracking is not enabled");
+ }
+
+ if (_pendingDeliveryTags is null)
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
+
TaskCompletionSource tcs;
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
@@ -1731,6 +1741,21 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
{
+ if (false == ConfirmsAreEnabled)
+ {
+ throw new InvalidOperationException("Confirms not selected");
+ }
+
+ if (false == _trackConfirmations)
+ {
+ throw new InvalidOperationException("Confirmation tracking is not enabled");
+ }
+
+ if (_pendingDeliveryTags is null)
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
+
try
{
bool onlyAcksReceived = await WaitForConfirmsAsync(token)
@@ -1761,6 +1786,21 @@ await CloseAsync(ea, false, token)
private async Task WaitForConfirmsWithTokenAsync(TaskCompletionSource tcs,
CancellationToken cancellationToken)
{
+ if (false == ConfirmsAreEnabled)
+ {
+ throw new InvalidOperationException("Confirms not selected");
+ }
+
+ if (false == _trackConfirmations)
+ {
+ throw new InvalidOperationException("Confirmation tracking is not enabled");
+ }
+
+ if (_pendingDeliveryTags is null)
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
+
CancellationTokenRegistration tokenRegistration =
#if NET6_0_OR_GREATER
cancellationToken.UnsafeRegister(
@@ -1785,6 +1825,63 @@ await tokenRegistration.DisposeAsync()
}
}
+ // NOTE: this method is internal for its use in this test:
+ // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
+ internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
+ {
+ // Only do this if confirms are enabled *and* the library is tracking confirmations
+ if (ConfirmsAreEnabled && _trackConfirmations)
+ {
+ if (_pendingDeliveryTags is null)
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
+ // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
+ _confirmSemaphore.Wait();
+ try
+ {
+ // No need to do anything if there are no delivery tags in the list
+ if (_pendingDeliveryTags.Count > 0)
+ {
+ if (multiple)
+ {
+ while (_pendingDeliveryTags.First!.Value < deliveryTag)
+ {
+ _pendingDeliveryTags.RemoveFirst();
+ }
+
+ if (_pendingDeliveryTags.First.Value == deliveryTag)
+ {
+ _pendingDeliveryTags.RemoveFirst();
+ }
+ }
+ else
+ {
+ _pendingDeliveryTags.Remove(deliveryTag);
+ }
+ }
+
+ _onlyAcksReceived = _onlyAcksReceived && !isNack;
+
+ if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0)
+ {
+ // Done, mark tasks
+ foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
+ {
+ confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived);
+ }
+
+ _confirmsTaskCompletionSources.Clear();
+ _onlyAcksReceived = true;
+ }
+ }
+ finally
+ {
+ _confirmSemaphore.Release();
+ }
+ }
+ }
+
private static BasicProperties? PopulateActivityAndPropagateTraceId(TProperties basicProperties,
Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
new file mode 100644
index 000000000..3708452d0
--- /dev/null
+++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
@@ -0,0 +1,212 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using RabbitMQ.Client;
+
+const int MESSAGE_COUNT = 50_000;
+bool debug = false;
+
+await PublishMessagesIndividuallyAsync();
+await PublishMessagesInBatchAsync();
+await HandlePublishConfirmsAsynchronously();
+
+static Task CreateConnectionAsync()
+{
+ var factory = new ConnectionFactory { HostName = "localhost" };
+ return factory.CreateConnectionAsync();
+}
+
+static async Task PublishMessagesIndividuallyAsync()
+{
+ Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");
+
+ using IConnection connection = await CreateConnectionAsync();
+ using IChannel channel = await connection.CreateChannelAsync();
+
+ // declare a server-named queue
+ QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
+ string queueName = queueDeclareResult.QueueName;
+ await channel.ConfirmSelectAsync();
+
+ var sw = new Stopwatch();
+ sw.Start();
+
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ byte[] body = Encoding.UTF8.GetBytes(i.ToString());
+ await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
+ }
+
+ await channel.WaitForConfirmsOrDieAsync();
+
+ sw.Stop();
+
+ Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
+}
+
+static async Task PublishMessagesInBatchAsync()
+{
+ Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
+
+ using IConnection connection = await CreateConnectionAsync();
+ using IChannel channel = await connection.CreateChannelAsync();
+
+ // declare a server-named queue
+ QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
+ string queueName = queueDeclareResult.QueueName;
+ await channel.ConfirmSelectAsync();
+
+ int batchSize = 100;
+ int outstandingMessageCount = 0;
+
+ var sw = new Stopwatch();
+ sw.Start();
+
+ var publishTasks = new List();
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ byte[] body = Encoding.UTF8.GetBytes(i.ToString());
+ publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
+ outstandingMessageCount++;
+
+ if (outstandingMessageCount == batchSize)
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
+ publishTasks.Clear();
+
+ await channel.WaitForConfirmsOrDieAsync(cts.Token);
+ outstandingMessageCount = 0;
+ }
+ }
+
+ if (outstandingMessageCount > 0)
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ await channel.WaitForConfirmsOrDieAsync(cts.Token);
+ }
+
+ sw.Stop();
+ Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
+}
+
+async Task HandlePublishConfirmsAsynchronously()
+{
+ Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");
+
+ using IConnection connection = await CreateConnectionAsync();
+ using IChannel channel = await connection.CreateChannelAsync();
+
+ // declare a server-named queue
+ QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
+ string queueName = queueDeclareResult.QueueName;
+
+ // NOTE: setting trackConfirmations to false because this program
+ // is tracking them itself.
+ await channel.ConfirmSelectAsync(trackConfirmations: false);
+
+ bool publishingCompleted = false;
+ var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var outstandingConfirms = new LinkedList();
+ var semaphore = new SemaphoreSlim(1, 1);
+ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
+ {
+ if (debug)
+ {
+ Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
+ DateTime.Now, deliveryTag, multiple);
+ }
+
+ semaphore.Wait();
+ try
+ {
+ if (multiple)
+ {
+ do
+ {
+ LinkedListNode? node = outstandingConfirms.First;
+ if (node is null)
+ {
+ break;
+ }
+ if (node.Value <= deliveryTag)
+ {
+ outstandingConfirms.RemoveFirst();
+ }
+ else
+ {
+ break;
+ }
+ } while (true);
+ }
+ else
+ {
+ outstandingConfirms.Remove(deliveryTag);
+ }
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+
+ if (publishingCompleted && outstandingConfirms.Count == 0)
+ {
+ allMessagesConfirmedTcs.SetResult(true);
+ }
+ }
+
+ channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
+ channel.BasicNacks += (sender, ea) =>
+ {
+ Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
+ CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
+ };
+
+ var sw = new Stopwatch();
+ sw.Start();
+
+ var publishTasks = new List();
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ string msg = i.ToString();
+ byte[] body = Encoding.UTF8.GetBytes(msg);
+ ulong nextPublishSeqNo = channel.NextPublishSeqNo;
+ if ((ulong)(i + 1) != nextPublishSeqNo)
+ {
+ Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");
+ }
+ await semaphore.WaitAsync();
+ try
+ {
+ outstandingConfirms.AddLast(nextPublishSeqNo);
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
+ }
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
+ publishingCompleted = true;
+
+ try
+ {
+ await allMessagesConfirmedTcs.Task.WaitAsync(cts.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
+ }
+ catch (TimeoutException)
+ {
+ Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
+ }
+
+ sw.Stop();
+ Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {sw.ElapsedMilliseconds:N0} ms");
+}
diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj
new file mode 100644
index 000000000..fd865eccf
--- /dev/null
+++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj
@@ -0,0 +1,19 @@
+
+
+
+ net6.0
+ $(NoWarn);CA2007
+ true
+
+
+
+ Exe
+ enable
+ 9.0
+
+
+
+
+
+
+
diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs
index 64714b769..42c612a63 100644
--- a/projects/Test/Integration/TestPublisherConfirms.cs
+++ b/projects/Test/Integration/TestPublisherConfirms.cs
@@ -30,7 +30,6 @@
//---------------------------------------------------------------------------
using System;
-using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
@@ -105,12 +104,8 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout
{
return TestWaitForConfirmsAsync(2000, async (ch) =>
{
- IChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel;
- actualChannel
- .GetType()
- .GetMethod("HandleAckNack", BindingFlags.Instance | BindingFlags.NonPublic)
- .Invoke(actualChannel, new object[] { 10UL, false, true });
-
+ RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel;
+ actualChannel.HandleAckNack(10UL, false, true);
using (var cts = new CancellationTokenSource(ShortSpan))
{
Assert.False(await ch.WaitForConfirmsAsync(cts.Token));