@@ -54,20 +54,34 @@ internal partial class Channel : IChannel, IRecoverable
5454
5555 private sealed class PublisherConfirmationInfo : IDisposable
5656 {
57- private TaskCompletionSource < bool > ? _publisherConfirmationTcs ;
57+ private readonly SemaphoreSlim _semaphore ;
58+ private readonly TaskCompletionSource < bool > ? _publisherConfirmationTcs ;
5859 private readonly RateLimitLease ? _lease ;
60+ private bool _semaphoreReleased ;
5961
60- internal PublisherConfirmationInfo ( ulong publishSequenceNumber ,
62+ internal PublisherConfirmationInfo (
63+ SemaphoreSlim semaphore ,
64+ ulong publishSequenceNumber ,
6165 TaskCompletionSource < bool > ? publisherConfirmationTcs ,
6266 RateLimitLease ? lease )
6367 {
68+ _semaphore = semaphore ;
6469 PublishSequenceNumber = publishSequenceNumber ;
6570 _publisherConfirmationTcs = publisherConfirmationTcs ;
6671 _lease = lease ;
6772 }
6873
6974 internal ulong PublishSequenceNumber { get ; }
7075
76+ internal void ReleaseSemaphore ( )
77+ {
78+ if ( ! _semaphoreReleased )
79+ {
80+ _semaphoreReleased = true ;
81+ _semaphore . Release ( ) ;
82+ }
83+ }
84+
7185 internal async Task MaybeWaitForConfirmationAsync ( CancellationToken cancellationToken )
7286 {
7387 if ( _publisherConfirmationTcs is not null )
@@ -92,6 +106,7 @@ internal bool MaybeHandleException(Exception ex)
92106
93107 public void Dispose ( )
94108 {
109+ ReleaseSemaphore ( ) ;
95110 _lease ? . Dispose ( ) ;
96111 }
97112 }
@@ -291,47 +306,50 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
291306 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
292307 private async Task < PublisherConfirmationInfo ? > MaybeStartPublisherConfirmationTrackingAsync ( CancellationToken cancellationToken )
293308 {
294- if ( _publisherConfirmationsEnabled )
309+ if ( ! _publisherConfirmationsEnabled )
295310 {
296- RateLimitLease ? lease = null ;
297- if ( _publisherConfirmationTrackingEnabled )
298- {
299- if ( _outstandingPublisherConfirmationsRateLimiter is not null )
300- {
301- lease = await _outstandingPublisherConfirmationsRateLimiter . AcquireAsync (
302- cancellationToken : cancellationToken )
303- . ConfigureAwait ( false ) ;
311+ return null ;
312+ }
304313
305- if ( ! lease . IsAcquired )
306- {
307- throw new InvalidOperationException ( "Could not acquire a lease from the rate limiter." ) ;
308- }
309- }
314+ RateLimitLease ? lease = null ;
315+ if ( _publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null )
316+ {
317+ lease = await _outstandingPublisherConfirmationsRateLimiter . AcquireAsync (
318+ cancellationToken : cancellationToken )
319+ . ConfigureAwait ( false ) ;
320+ if ( ! lease . IsAcquired )
321+ {
322+ throw new InvalidOperationException ( "Could not acquire a lease from the rate limiter." ) ;
310323 }
324+ }
311325
312- await _confirmSemaphore . WaitAsync ( cancellationToken )
313- . ConfigureAwait ( false ) ;
326+ try
327+ {
328+ await _confirmSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
329+ }
330+ catch ( OperationCanceledException )
331+ {
332+ lease ? . Dispose ( ) ;
333+ throw ;
334+ }
314335
315- ulong publishSequenceNumber = _nextPublishSeqNo ;
336+ ulong publishSequenceNumber = _nextPublishSeqNo ;
316337
317- TaskCompletionSource < bool > ? publisherConfirmationTcs = null ;
318- if ( _publisherConfirmationTrackingEnabled )
338+ TaskCompletionSource < bool > ? publisherConfirmationTcs = null ;
339+ if ( _publisherConfirmationTrackingEnabled )
340+ {
341+ publisherConfirmationTcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
342+ if ( ! _confirmsTaskCompletionSources . TryAdd ( publishSequenceNumber , publisherConfirmationTcs ) )
319343 {
320- publisherConfirmationTcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
321- if ( ! _confirmsTaskCompletionSources . TryAdd ( publishSequenceNumber , publisherConfirmationTcs ) )
322- {
323- throw new InvalidOperationException ( $ "Failed to track the publisher confirmation for sequence number '{ publishSequenceNumber } ' because it already exists.") ;
324- }
344+ _confirmSemaphore . Release ( ) ;
345+ lease ? . Dispose ( ) ;
346+ throw new InvalidOperationException ( $ "Failed to track the publisher confirmation for sequence number '{ publishSequenceNumber } ' because it already exists.") ;
325347 }
348+ }
326349
327- _nextPublishSeqNo ++ ;
350+ _nextPublishSeqNo ++ ;
328351
329- return new PublisherConfirmationInfo ( publishSequenceNumber , publisherConfirmationTcs , lease ) ;
330- }
331- else
332- {
333- return null ;
334- }
352+ return new PublisherConfirmationInfo ( _confirmSemaphore , publishSequenceNumber , publisherConfirmationTcs , lease ) ;
335353 }
336354
337355 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
@@ -360,44 +378,26 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
360378 private async Task MaybeEndPublisherConfirmationTrackingAsync ( PublisherConfirmationInfo ? publisherConfirmationInfo ,
361379 CancellationToken cancellationToken )
362380 {
363- if ( _publisherConfirmationsEnabled )
381+ if ( publisherConfirmationInfo is null )
364382 {
365- try
366- {
367- _confirmSemaphore . Release ( ) ;
368- }
369- catch ( SemaphoreFullException ex )
370- {
371- /*
372- * rabbitmq/rabbitmq-dotnet-client-1793
373- * If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring
374- * _confirmSemaphore, the above Release() call will throw SemaphoreFullException.
375- * In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw
376- * a "bug found" exception here.
377- */
378- if ( publisherConfirmationInfo is not null )
379- {
380- throw new InvalidOperationException ( InternalConstants . BugFound , ex ) ;
381- }
382- }
383+ return ;
384+ }
383385
384- if ( publisherConfirmationInfo is not null )
385- {
386- try
387- {
388- await publisherConfirmationInfo . MaybeWaitForConfirmationAsync ( cancellationToken )
389- . ConfigureAwait ( false ) ;
390- }
391- catch ( OperationCanceledException )
392- {
393- _confirmsTaskCompletionSources . TryRemove ( publisherConfirmationInfo . PublishSequenceNumber , out _ ) ;
394- throw ;
395- }
396- finally
397- {
398- publisherConfirmationInfo . Dispose ( ) ;
399- }
400- }
386+ publisherConfirmationInfo . ReleaseSemaphore ( ) ;
387+
388+ try
389+ {
390+ await publisherConfirmationInfo . MaybeWaitForConfirmationAsync ( cancellationToken )
391+ . ConfigureAwait ( false ) ;
392+ }
393+ catch ( OperationCanceledException )
394+ {
395+ _confirmsTaskCompletionSources . TryRemove ( publisherConfirmationInfo . PublishSequenceNumber , out _ ) ;
396+ throw ;
397+ }
398+ finally
399+ {
400+ publisherConfirmationInfo . Dispose ( ) ;
401401 }
402402 }
403403
0 commit comments