diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index a5b9889fb..87a97eb54 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -440,9 +440,9 @@ RabbitMQ.Client.IChannel.CurrentQueue.get -> string RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer RabbitMQ.Client.IChannel.DefaultConsumer.set -> void RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler +RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.IsClosed.get -> bool RabbitMQ.Client.IChannel.IsOpen.get -> bool -RabbitMQ.Client.IChannel.NextPublishSeqNo.get -> ulong RabbitMQ.Client.IChannelExtensions RabbitMQ.Client.IConnection RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 84dff9517..fa03b6b4e 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -95,11 +95,6 @@ public interface IChannel : IDisposable /// bool IsOpen { get; } - /// - /// When in confirm mode, return the sequence number of the next message to be published. - /// - ulong NextPublishSeqNo { get; } - /// /// The name of the last queue declared on this channel. /// @@ -143,6 +138,11 @@ public interface IChannel : IDisposable /// event EventHandler ChannelShutdown; + /// + /// When in confirm mode, return the sequence number of the next message to be published. + /// + ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default); + /// Asynchronously acknknowledges one or more messages. /// The delivery tag. /// Ack all messages up to the delivery tag if set to true. diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index c31147eee..4642f5376 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -89,13 +89,11 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.BasicAck: { - HandleBasicAck(cmd); - return Task.FromResult(true); + return HandleBasicAck(cmd, cancellationToken); } case ProtocolCommandId.BasicNack: { - HandleBasicNack(cmd); - return Task.FromResult(true); + return HandleBasicNack(cmd, cancellationToken); } case ProtocolCommandId.BasicReturn: { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index b786a288b..9ce69b013 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -144,8 +144,6 @@ public IAsyncBasicConsumer? DefaultConsumer public bool IsOpen => !_disposed && _innerChannel.IsOpen; - public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo; - public string? CurrentQueue => InnerChannel.CurrentQueue; internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers, @@ -274,6 +272,8 @@ public void Dispose() _disposed = true; } + public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken); + public ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken) => InnerChannel.BasicAckAsync(deliveryTag, multiple, cancellationToken); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 97925a7b0..225ba9554 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -181,29 +181,6 @@ public IAsyncBasicConsumer? DefaultConsumer [MemberNotNullWhen(false, nameof(CloseReason))] public bool IsOpen => CloseReason is null; - public ulong NextPublishSeqNo - { - get - { - if (ConfirmsAreEnabled) - { - _confirmSemaphore.Wait(); - try - { - return _nextPublishSeqNo; - } - finally - { - _confirmSemaphore.Release(); - } - } - else - { - return _nextPublishSeqNo; - } - } - } - public string? CurrentQueue { get; private set; } public ISession Session { get; private set; } @@ -589,7 +566,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart return ModelSendAsync(method, cancellationToken).AsTask(); } - protected void HandleBasicAck(IncomingCommand cmd) + protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken) { var ack = new BasicAck(cmd.MethodSpan); if (!_basicAcksWrapper.IsEmpty) @@ -598,10 +575,12 @@ protected void HandleBasicAck(IncomingCommand cmd) _basicAcksWrapper.Invoke(this, args); } - HandleAckNack(ack._deliveryTag, ack._multiple, false); + await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken) + .ConfigureAwait(false); + return true; } - protected void HandleBasicNack(IncomingCommand cmd) + protected async Task HandleBasicNack(IncomingCommand cmd, CancellationToken cancellationToken) { var nack = new BasicNack(cmd.MethodSpan); if (!_basicNacksWrapper.IsEmpty) @@ -611,7 +590,9 @@ protected void HandleBasicNack(IncomingCommand cmd) _basicNacksWrapper.Invoke(this, args); } - HandleAckNack(nack._deliveryTag, nack._multiple, true); + await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken) + .ConfigureAwait(false); + return true; } protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) @@ -801,6 +782,26 @@ protected void HandleConnectionUnblocked() Session.Connection.HandleConnectionUnblocked(); } + public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) + { + if (ConfirmsAreEnabled) + { + await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return _nextPublishSeqNo; + } + finally + { + _confirmSemaphore.Release(); + } + } + else + { + return _nextPublishSeqNo; + } + } + public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken); @@ -1829,7 +1830,7 @@ await tokenRegistration.DisposeAsync() // NOTE: this method is internal for its use in this test: // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse - internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) + internal async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default) { // Only do this if confirms are enabled *and* the library is tracking confirmations if (ConfirmsAreEnabled && _trackConfirmations) @@ -1839,7 +1840,8 @@ internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) throw new InvalidOperationException(InternalConstants.BugFound); } // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted - _confirmSemaphore.Wait(); + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); try { // No need to do anything if there are no delivery tags in the list diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 3708452d0..47864833d 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -173,7 +173,7 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple) { string msg = i.ToString(); byte[] body = Encoding.UTF8.GetBytes(msg); - ulong nextPublishSeqNo = channel.NextPublishSeqNo; + ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync(); if ((ulong)(i + 1) != nextPublishSeqNo) { Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}"); diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index cc2493f4b..d2b9a38a5 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -53,19 +53,19 @@ ValueTask PublishAsync() } await _channel.ConfirmSelectAsync(); - Assert.Equal(1ul, _channel.NextPublishSeqNo); + Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); - Assert.Equal(2ul, _channel.NextPublishSeqNo); + Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); - Assert.Equal(3ul, _channel.NextPublishSeqNo); + Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); await _channel.ConfirmSelectAsync(); await PublishAsync(); - Assert.Equal(4ul, _channel.NextPublishSeqNo); + Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); - Assert.Equal(5ul, _channel.NextPublishSeqNo); + Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); - Assert.Equal(6ul, _channel.NextPublishSeqNo); + Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync()); } [Theory] @@ -80,7 +80,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) await _channel.ConfirmSelectAsync(); var properties = new BasicProperties(); - // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); + // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, mandatory: false, basicProperties: properties, body: body); await _channel.WaitForConfirmsOrDieAsync(); @@ -91,7 +91,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, { CorrelationId = new string('o', correlationIdLength) }; - // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); + // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); await _channel.WaitForConfirmsOrDieAsync(); } @@ -101,7 +101,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, } properties = new BasicProperties(); - // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); + // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); await _channel.WaitForConfirmsOrDieAsync(); // _output.WriteLine("I'm done..."); diff --git a/projects/Test/Integration/TestConfirmSelectAsync.cs b/projects/Test/Integration/TestConfirmSelectAsync.cs index bfbd7fa2a..0681ef05a 100644 --- a/projects/Test/Integration/TestConfirmSelectAsync.cs +++ b/projects/Test/Integration/TestConfirmSelectAsync.cs @@ -49,19 +49,19 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output) public async Task TestConfirmSelectIdempotency() { await _channel.ConfirmSelectAsync(); - Assert.Equal(1ul, _channel.NextPublishSeqNo); + Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); - Assert.Equal(2ul, _channel.NextPublishSeqNo); + Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); - Assert.Equal(3ul, _channel.NextPublishSeqNo); + Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); await _channel.ConfirmSelectAsync(); await Publish(); - Assert.Equal(4ul, _channel.NextPublishSeqNo); + Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); - Assert.Equal(5ul, _channel.NextPublishSeqNo); + Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); - Assert.Equal(6ul, _channel.NextPublishSeqNo); + Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync()); } private ValueTask Publish() diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 42c612a63..61a13a446 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -105,7 +105,7 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout return TestWaitForConfirmsAsync(2000, async (ch) => { RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel; - actualChannel.HandleAckNack(10UL, false, true); + await actualChannel.HandleAckNack(10UL, false, true); using (var cts = new CancellationTokenSource(ShortSpan)) { Assert.False(await ch.WaitForConfirmsAsync(cts.Token));