Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
PublisherConfirmationInfo? publisherConfirmationInfo = null;
try
{
publisherConfirmationInfo =
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
.ConfigureAwait(false);
publisherConfirmationInfo = await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
.ConfigureAwait(false);

await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -106,9 +105,8 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
PublisherConfirmationInfo? publisherConfirmationInfo = null;
try
{
publisherConfirmationInfo =
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
.ConfigureAwait(false);
publisherConfirmationInfo = await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
.ConfigureAwait(false);

await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);
Expand Down
127 changes: 58 additions & 69 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ internal partial class Channel : IChannel, IRecoverable
private bool _publisherConfirmationTrackingEnabled = false;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();

private sealed class PublisherConfirmationInfo : IDisposable
{
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
private readonly TaskCompletionSource<bool>? _publisherConfirmationTcs;
private readonly RateLimitLease? _lease;

internal PublisherConfirmationInfo(ulong publishSequenceNumber,
Expand Down Expand Up @@ -275,8 +275,7 @@ private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(Shu

if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
.ConfigureAwait(false);
await _confirmSemaphore.WaitAsync(reason.CancellationToken).ConfigureAwait(false);
try
{
MaybeSetExceptionOnConfirmsTcs(reason);
Expand All @@ -291,47 +290,55 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken)
{
if (_publisherConfirmationsEnabled)
if (!_publisherConfirmationsEnabled)
{
RateLimitLease? lease = null;
if (_publisherConfirmationTrackingEnabled)
{
if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
cancellationToken: cancellationToken)
.ConfigureAwait(false);
return null;
}

if (!lease.IsAcquired)
{
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
}
}
RateLimitLease? lease = null;
if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null)
{
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
cancellationToken: cancellationToken)
.ConfigureAwait(false);
if (!lease.IsAcquired)
{
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
}
}

await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
bool confirmSemaphoreAcquired;
try
{
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
confirmSemaphoreAcquired = true;
}
catch (OperationCanceledException)
{
lease?.Dispose();
throw;
}

ulong publishSequenceNumber = _nextPublishSeqNo;
ulong publishSequenceNumber = _nextPublishSeqNo;

TaskCompletionSource<bool>? publisherConfirmationTcs = null;
if (_publisherConfirmationTrackingEnabled)
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
if (_publisherConfirmationTrackingEnabled)
{
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs))
{
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs))
if (confirmSemaphoreAcquired)
{
throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists.");
_confirmSemaphore.Release();
}
lease?.Dispose();
throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists.");
}
}

_nextPublishSeqNo++;
_nextPublishSeqNo++;

return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
}
else
{
return null;
}
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -360,44 +367,26 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmationInfo? publisherConfirmationInfo,
CancellationToken cancellationToken)
{
if (_publisherConfirmationsEnabled)
if (publisherConfirmationInfo is null)
{
try
{
_confirmSemaphore.Release();
}
catch (SemaphoreFullException ex)
{
/*
* rabbitmq/rabbitmq-dotnet-client-1793
* If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring
* _confirmSemaphore, the above Release() call will throw SemaphoreFullException.
* In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw
* a "bug found" exception here.
*/
if (publisherConfirmationInfo is not null)
{
throw new InvalidOperationException(InternalConstants.BugFound, ex);
}
}
return;
}

if (publisherConfirmationInfo is not null)
{
try
{
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
throw;
}
finally
{
publisherConfirmationInfo.Dispose();
}
}
_confirmSemaphore.Release();

try
{
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
throw;
}
finally
{
publisherConfirmationInfo.Dispose();
}
}

Expand Down
14 changes: 6 additions & 8 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -582,15 +582,14 @@ protected virtual void Dispose(bool disposing)
_serverOriginatedChannelCloseTcs?.Task.Wait(InternalConstants.DefaultChannelDisposeTimeout);

ConsumerDispatcher.Dispose();

_outstandingPublisherConfirmationsRateLimiter?.Dispose();
}
finally
{
try
{
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
MaybeSetExceptionOnConfirmsTcs();
}
catch
Expand Down Expand Up @@ -642,19 +641,18 @@ await _serverOriginatedChannelCloseTcs.Task.WaitAsync(InternalConstants.DefaultC
}

ConsumerDispatcher.Dispose();

if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
}
}
finally
{
try
{
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
}
}
catch
{
Expand Down
7 changes: 1 addition & 6 deletions projects/Test/Integration/GH/TestGitHubIssues.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This source code is dual-licensed under the Apache License, version
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
Expand Down Expand Up @@ -370,11 +370,6 @@ await Assert.ThrowsAnyAsync<InvalidOperationException>(async () =>
});
break;
}
catch (SemaphoreFullException ex0)
{
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex0);
retryCount++;
}
catch (PublishException)
{
retryCount++;
Expand Down
Loading