diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 06d92d357..c07d27299 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -51,9 +51,8 @@ public async ValueTask BasicPublishAsync(string exchange, string ro PublisherConfirmationInfo? publisherConfirmationInfo = null; try { - publisherConfirmationInfo = - await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken) - .ConfigureAwait(false); + publisherConfirmationInfo = await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken) + .ConfigureAwait(false); await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); @@ -106,9 +105,8 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac PublisherConfirmationInfo? publisherConfirmationInfo = null; try { - publisherConfirmationInfo = - await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken) - .ConfigureAwait(false); + publisherConfirmationInfo = await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken) + .ConfigureAwait(false); await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 485df71fa..aa838d27e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -49,12 +49,12 @@ internal partial class Channel : IChannel, IRecoverable private bool _publisherConfirmationTrackingEnabled = false; private ulong _nextPublishSeqNo = 0; private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); - private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); private RateLimiter? _outstandingPublisherConfirmationsRateLimiter; + private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); private sealed class PublisherConfirmationInfo : IDisposable { - private TaskCompletionSource? _publisherConfirmationTcs; + private readonly TaskCompletionSource? _publisherConfirmationTcs; private readonly RateLimitLease? _lease; internal PublisherConfirmationInfo(ulong publishSequenceNumber, @@ -275,8 +275,7 @@ private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(Shu if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(reason.CancellationToken) - .ConfigureAwait(false); + await _confirmSemaphore.WaitAsync(reason.CancellationToken).ConfigureAwait(false); try { MaybeSetExceptionOnConfirmsTcs(reason); @@ -291,47 +290,55 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) [MethodImpl(MethodImplOptions.AggressiveInlining)] private async Task MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken) { - if (_publisherConfirmationsEnabled) + if (!_publisherConfirmationsEnabled) { - RateLimitLease? lease = null; - if (_publisherConfirmationTrackingEnabled) - { - if (_outstandingPublisherConfirmationsRateLimiter is not null) - { - lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync( - cancellationToken: cancellationToken) - .ConfigureAwait(false); + return null; + } - if (!lease.IsAcquired) - { - throw new InvalidOperationException("Could not acquire a lease from the rate limiter."); - } - } + RateLimitLease? lease = null; + if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null) + { + lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync( + cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (!lease.IsAcquired) + { + throw new InvalidOperationException("Could not acquire a lease from the rate limiter."); } + } - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); + bool confirmSemaphoreAcquired; + try + { + await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + confirmSemaphoreAcquired = true; + } + catch (OperationCanceledException) + { + lease?.Dispose(); + throw; + } - ulong publishSequenceNumber = _nextPublishSeqNo; + ulong publishSequenceNumber = _nextPublishSeqNo; - TaskCompletionSource? publisherConfirmationTcs = null; - if (_publisherConfirmationTrackingEnabled) + TaskCompletionSource? publisherConfirmationTcs = null; + if (_publisherConfirmationTrackingEnabled) + { + publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs)) { - publisherConfirmationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs)) + if (confirmSemaphoreAcquired) { - throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists."); + _confirmSemaphore.Release(); } + lease?.Dispose(); + throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists."); } + } - _nextPublishSeqNo++; + _nextPublishSeqNo++; - return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease); - } - else - { - return null; - } + return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -360,44 +367,26 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmationInfo? publisherConfirmationInfo, CancellationToken cancellationToken) { - if (_publisherConfirmationsEnabled) + if (publisherConfirmationInfo is null) { - try - { - _confirmSemaphore.Release(); - } - catch (SemaphoreFullException ex) - { - /* - * rabbitmq/rabbitmq-dotnet-client-1793 - * If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring - * _confirmSemaphore, the above Release() call will throw SemaphoreFullException. - * In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw - * a "bug found" exception here. - */ - if (publisherConfirmationInfo is not null) - { - throw new InvalidOperationException(InternalConstants.BugFound, ex); - } - } + return; + } - if (publisherConfirmationInfo is not null) - { - try - { - await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) - .ConfigureAwait(false); - } - catch (OperationCanceledException) - { - _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); - throw; - } - finally - { - publisherConfirmationInfo.Dispose(); - } - } + _confirmSemaphore.Release(); + + try + { + await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) + .ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); + throw; + } + finally + { + publisherConfirmationInfo.Dispose(); } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index c8eb8e78c..34ea83edb 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -582,8 +582,6 @@ protected virtual void Dispose(bool disposing) _serverOriginatedChannelCloseTcs?.Task.Wait(InternalConstants.DefaultChannelDisposeTimeout); ConsumerDispatcher.Dispose(); - - _outstandingPublisherConfirmationsRateLimiter?.Dispose(); } finally { @@ -591,6 +589,7 @@ protected virtual void Dispose(bool disposing) { _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); + _outstandingPublisherConfirmationsRateLimiter?.Dispose(); MaybeSetExceptionOnConfirmsTcs(); } catch @@ -642,12 +641,6 @@ await _serverOriginatedChannelCloseTcs.Task.WaitAsync(InternalConstants.DefaultC } ConsumerDispatcher.Dispose(); - - if (_outstandingPublisherConfirmationsRateLimiter is not null) - { - await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() - .ConfigureAwait(false); - } } finally { @@ -655,6 +648,11 @@ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() { _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } } catch { diff --git a/projects/Test/Integration/GH/TestGitHubIssues.cs b/projects/Test/Integration/GH/TestGitHubIssues.cs index 7eb9bfd2a..9718f76a5 100644 --- a/projects/Test/Integration/GH/TestGitHubIssues.cs +++ b/projects/Test/Integration/GH/TestGitHubIssues.cs @@ -1,4 +1,4 @@ -// This source code is dual-licensed under the Apache License, version +// This source code is dual-licensed under the Apache License, version // 2.0, and the Mozilla Public License, version 2.0. // // The APL v2.0: @@ -370,11 +370,6 @@ await Assert.ThrowsAnyAsync(async () => }); break; } - catch (SemaphoreFullException ex0) - { - _output.WriteLine("{0} ex: {1}", _testDisplayName, ex0); - retryCount++; - } catch (PublishException) { retryCount++;