35
35
using System . Diagnostics ;
36
36
using System . Runtime . CompilerServices ;
37
37
using System . Threading ;
38
+ using System . Threading . RateLimiting ;
38
39
using System . Threading . Tasks ;
39
40
using RabbitMQ . Client . Events ;
40
41
using RabbitMQ . Client . Exceptions ;
@@ -47,32 +48,26 @@ internal partial class Channel : IChannel, IRecoverable
47
48
{
48
49
private bool _publisherConfirmationsEnabled = false ;
49
50
private bool _publisherConfirmationTrackingEnabled = false ;
50
- private ushort ? _maxOutstandingPublisherConfirmations = null ;
51
- private SemaphoreSlim ? _maxOutstandingConfirmationsSemaphore ;
52
51
private ulong _nextPublishSeqNo = 0 ;
53
52
private readonly SemaphoreSlim _confirmSemaphore = new ( 1 , 1 ) ;
54
53
private readonly ConcurrentDictionary < ulong , TaskCompletionSource < bool > > _confirmsTaskCompletionSources = new ( ) ;
54
+ private RateLimiter ? _outstandingPublisherConfirmationsRateLimiter ;
55
55
56
- private class PublisherConfirmationInfo
56
+ private sealed class PublisherConfirmationInfo : IDisposable
57
57
{
58
- private ulong _publishSequenceNumber ;
59
58
private TaskCompletionSource < bool > ? _publisherConfirmationTcs ;
59
+ private readonly RateLimitLease ? _lease ;
60
60
61
- internal PublisherConfirmationInfo ( )
61
+ internal PublisherConfirmationInfo ( ulong publishSequenceNumber ,
62
+ TaskCompletionSource < bool > ? publisherConfirmationTcs ,
63
+ RateLimitLease ? lease )
62
64
{
63
- _publishSequenceNumber = 0 ;
64
- _publisherConfirmationTcs = null ;
65
- }
66
-
67
- internal PublisherConfirmationInfo ( ulong publishSequenceNumber , TaskCompletionSource < bool > ? publisherConfirmationTcs )
68
- {
69
- _publishSequenceNumber = publishSequenceNumber ;
65
+ PublishSequenceNumber = publishSequenceNumber ;
70
66
_publisherConfirmationTcs = publisherConfirmationTcs ;
67
+ _lease = lease ;
71
68
}
72
69
73
- internal ulong PublishSequenceNumber => _publishSequenceNumber ;
74
-
75
- internal TaskCompletionSource < bool > ? PublisherConfirmationTcs => _publisherConfirmationTcs ;
70
+ internal ulong PublishSequenceNumber { get ; }
76
71
77
72
internal async Task MaybeWaitForConfirmationAsync ( CancellationToken cancellationToken )
78
73
{
@@ -95,6 +90,11 @@ internal bool MaybeHandleException(Exception ex)
95
90
96
91
return exceptionWasHandled ;
97
92
}
93
+
94
+ public void Dispose ( )
95
+ {
96
+ _lease ? . Dispose ( ) ;
97
+ }
98
98
}
99
99
100
100
public async ValueTask < ulong > GetNextPublishSequenceNumberAsync ( CancellationToken cancellationToken = default )
@@ -119,18 +119,11 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
119
119
120
120
private void ConfigurePublisherConfirmations ( bool publisherConfirmationsEnabled ,
121
121
bool publisherConfirmationTrackingEnabled ,
122
- ushort ? maxOutstandingPublisherConfirmations )
122
+ RateLimiter ? outstandingPublisherConfirmationsRateLimiter )
123
123
{
124
124
_publisherConfirmationsEnabled = publisherConfirmationsEnabled ;
125
125
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled ;
126
- _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations ;
127
-
128
- if ( _publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null )
129
- {
130
- _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim (
131
- ( int ) _maxOutstandingPublisherConfirmations ,
132
- ( int ) _maxOutstandingPublisherConfirmations ) ;
133
- }
126
+ _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter ;
134
127
}
135
128
136
129
private async Task MaybeConfirmSelect ( CancellationToken cancellationToken )
@@ -282,11 +275,15 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
282
275
{
283
276
if ( _publisherConfirmationsEnabled )
284
277
{
285
- if ( _publisherConfirmationTrackingEnabled &&
286
- _maxOutstandingConfirmationsSemaphore is not null )
278
+ RateLimitLease ? lease = null ;
279
+ if ( _publisherConfirmationTrackingEnabled )
287
280
{
288
- await _maxOutstandingConfirmationsSemaphore . WaitAsync ( cancellationToken )
289
- . ConfigureAwait ( false ) ;
281
+ if ( _outstandingPublisherConfirmationsRateLimiter is not null )
282
+ {
283
+ lease = await _outstandingPublisherConfirmationsRateLimiter . AcquireAsync (
284
+ cancellationToken : cancellationToken )
285
+ . ConfigureAwait ( false ) ;
286
+ }
290
287
}
291
288
292
289
await _confirmSemaphore . WaitAsync ( cancellationToken )
@@ -303,7 +300,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
303
300
304
301
_nextPublishSeqNo ++ ;
305
302
306
- return new PublisherConfirmationInfo ( publishSequenceNumber , publisherConfirmationTcs ) ;
303
+ return new PublisherConfirmationInfo ( publishSequenceNumber , publisherConfirmationTcs , lease ) ;
307
304
}
308
305
else
309
306
{
@@ -339,18 +336,19 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
339
336
{
340
337
if ( _publisherConfirmationsEnabled )
341
338
{
342
- if ( _publisherConfirmationTrackingEnabled &&
343
- _maxOutstandingConfirmationsSemaphore is not null )
344
- {
345
- _maxOutstandingConfirmationsSemaphore . Release ( ) ;
346
- }
347
-
348
339
_confirmSemaphore . Release ( ) ;
349
340
350
341
if ( publisherConfirmationInfo is not null )
351
342
{
352
- await publisherConfirmationInfo . MaybeWaitForConfirmationAsync ( cancellationToken )
353
- . ConfigureAwait ( false ) ;
343
+ try
344
+ {
345
+ await publisherConfirmationInfo . MaybeWaitForConfirmationAsync ( cancellationToken )
346
+ . ConfigureAwait ( false ) ;
347
+ }
348
+ finally
349
+ {
350
+ publisherConfirmationInfo . Dispose ( ) ;
351
+ }
354
352
}
355
353
}
356
354
}
0 commit comments