Skip to content

Commit e06163e

Browse files
Move semaphore and RateLimiter ownership to PublisherConfirmLeaseFactory
1 parent 7b219fd commit e06163e

File tree

3 files changed

+130
-102
lines changed

3 files changed

+130
-102
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
4949
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
5050
{
5151
PublisherConfirmationInfo? publisherConfirmationInfo = null;
52+
PublisherConfirmLease? publisherConfirmLease = null;
53+
if (_publisherConfirmationsEnabled)
54+
{
55+
publisherConfirmLease = await _confirmLeaseFactory.AcquireWithLeaseAsync(cancellationToken).ConfigureAwait(false);
56+
}
5257
try
5358
{
54-
publisherConfirmationInfo =
55-
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
56-
.ConfigureAwait(false);
59+
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();
5760

5861
await MaybeEnforceFlowControlAsync(cancellationToken)
5962
.ConfigureAwait(false);
@@ -93,6 +96,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
9396
}
9497
finally
9598
{
99+
publisherConfirmLease?.Dispose();
96100
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
97101
.ConfigureAwait(false);
98102
}
@@ -104,11 +108,14 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
104108
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
105109
{
106110
PublisherConfirmationInfo? publisherConfirmationInfo = null;
111+
PublisherConfirmLease? publisherConfirmLease = null;
112+
if (_publisherConfirmationsEnabled)
113+
{
114+
publisherConfirmLease = await _confirmLeaseFactory.AcquireWithLeaseAsync(cancellationToken).ConfigureAwait(false);
115+
}
107116
try
108117
{
109-
publisherConfirmationInfo =
110-
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
111-
.ConfigureAwait(false);
118+
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();
112119

113120
await MaybeEnforceFlowControlAsync(cancellationToken)
114121
.ConfigureAwait(false);
@@ -148,6 +155,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
148155
}
149156
finally
150157
{
158+
publisherConfirmLease?.Dispose();
151159
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
152160
.ConfigureAwait(false);
153161
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 114 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,18 @@ internal partial class Channel : IChannel, IRecoverable
4848
private bool _publisherConfirmationsEnabled = false;
4949
private bool _publisherConfirmationTrackingEnabled = false;
5050
private ulong _nextPublishSeqNo = 0;
51-
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
51+
private PublisherConfirmLeaseFactory _confirmLeaseFactory = new(false, null);
5252
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
53-
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
5453

55-
private sealed class PublisherConfirmationInfo : IDisposable
54+
private sealed class PublisherConfirmationInfo
5655
{
57-
private readonly TaskCompletionSource<bool>? _publisherConfirmationTcs;
58-
private readonly RateLimitLease? _lease;
56+
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
5957

60-
internal PublisherConfirmationInfo(
61-
ulong publishSequenceNumber,
62-
TaskCompletionSource<bool>? publisherConfirmationTcs,
63-
RateLimitLease? lease)
58+
internal PublisherConfirmationInfo(ulong publishSequenceNumber,
59+
TaskCompletionSource<bool>? publisherConfirmationTcs)
6460
{
6561
PublishSequenceNumber = publishSequenceNumber;
6662
_publisherConfirmationTcs = publisherConfirmationTcs;
67-
_lease = lease;
6863
}
6964

7065
internal ulong PublishSequenceNumber { get; }
@@ -90,9 +85,82 @@ internal bool MaybeHandleException(Exception ex)
9085

9186
return exceptionWasHandled;
9287
}
88+
}
89+
90+
private sealed class PublisherConfirmLeaseFactory: IDisposable
91+
{
92+
private readonly SemaphoreSlim _confirmSemaphore;
93+
private readonly bool _trackingEnabled;
94+
private readonly RateLimiter? _rateLimiter;
95+
96+
internal PublisherConfirmLeaseFactory(bool trackingEnabled, RateLimiter? rateLimiter)
97+
{
98+
_confirmSemaphore = new SemaphoreSlim(1, 1);
99+
;
100+
_trackingEnabled = trackingEnabled;
101+
_rateLimiter = rateLimiter;
102+
}
103+
104+
internal async Task<PublisherConfirmLease?> AcquireWithLeaseAsync(CancellationToken ct)
105+
{
106+
RateLimitLease? lease = null;
107+
if (_trackingEnabled && _rateLimiter is not null)
108+
{
109+
lease = await _rateLimiter.AcquireAsync(cancellationToken: ct).ConfigureAwait(false);
110+
if (!lease.IsAcquired)
111+
{
112+
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
113+
}
114+
}
115+
116+
try
117+
{
118+
await _confirmSemaphore.WaitAsync(ct).ConfigureAwait(false);
119+
}
120+
catch (OperationCanceledException)
121+
{
122+
lease?.Dispose();
123+
throw;
124+
}
125+
126+
return new PublisherConfirmLease(_confirmSemaphore, lease);
127+
}
128+
129+
internal async Task<PublisherConfirmLease?> AcquireAsync(CancellationToken ct)
130+
{
131+
await _confirmSemaphore.WaitAsync(ct).ConfigureAwait(false);
132+
return new PublisherConfirmLease(_confirmSemaphore, null);
133+
}
134+
135+
public void Dispose()
136+
{
137+
_confirmSemaphore.Dispose();
138+
_rateLimiter?.Dispose();
139+
}
140+
}
141+
142+
private sealed class PublisherConfirmLease : IDisposable
143+
{
144+
private readonly SemaphoreSlim _confirmSemaphore;
145+
private readonly RateLimitLease? _lease;
146+
private bool _disposed;
147+
148+
internal PublisherConfirmLease(SemaphoreSlim factory,
149+
RateLimitLease? rateLimitLease)
150+
{
151+
_confirmSemaphore = factory;
152+
_lease = rateLimitLease;
153+
}
93154

94155
public void Dispose()
95156
{
157+
if (_disposed)
158+
{
159+
return;
160+
}
161+
162+
_disposed = true;
163+
_confirmSemaphore.Release();
96164
_lease?.Dispose();
97165
}
98166
}
@@ -101,15 +169,10 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
101169
{
102170
if (_publisherConfirmationsEnabled)
103171
{
104-
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
105-
try
172+
using (await _confirmLeaseFactory.AcquireAsync(cancellationToken).ConfigureAwait(false))
106173
{
107174
return _nextPublishSeqNo;
108175
}
109-
finally
110-
{
111-
_confirmSemaphore.Release();
112-
}
113176
}
114177
else
115178
{
@@ -123,7 +186,11 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
123186
{
124187
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
125188
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
126-
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
189+
if (publisherConfirmationsEnabled)
190+
{
191+
_confirmLeaseFactory = new PublisherConfirmLeaseFactory(_publisherConfirmationTrackingEnabled,
192+
outstandingPublisherConfirmationsRateLimiter);
193+
}
127194
}
128195

129196
private async Task MaybeConfirmSelectAsync(CancellationToken cancellationToken)
@@ -276,71 +343,38 @@ private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(Shu
276343

277344
if (_publisherConfirmationsEnabled)
278345
{
279-
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
280-
.ConfigureAwait(false);
281-
try
346+
using (await _confirmLeaseFactory.AcquireAsync(reason.CancellationToken).ConfigureAwait(false))
282347
{
283348
MaybeSetExceptionOnConfirmsTcs(reason);
284349
}
285-
finally
286-
{
287-
_confirmSemaphore.Release();
288-
}
289350
}
290351
}
291352

292353
[MethodImpl(MethodImplOptions.AggressiveInlining)]
293-
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken)
354+
private PublisherConfirmationInfo? MaybeStartPublisherConfirmationTracking()
294355
{
295-
if (!_publisherConfirmationsEnabled)
296-
{
297-
return null;
298-
}
299-
300-
RateLimitLease? lease = null;
301-
if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null)
302-
{
303-
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
304-
cancellationToken: cancellationToken)
305-
.ConfigureAwait(false);
306-
if (!lease.IsAcquired)
307-
{
308-
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
309-
}
310-
}
311-
312-
bool confirmSemaphoreAcquired;
313-
try
314-
{
315-
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
316-
confirmSemaphoreAcquired = true;
317-
}
318-
catch (OperationCanceledException)
356+
if (_publisherConfirmationsEnabled)
319357
{
320-
lease?.Dispose();
321-
throw;
322-
}
323-
324-
ulong publishSequenceNumber = _nextPublishSeqNo;
358+
ulong publishSequenceNumber = _nextPublishSeqNo;
325359

326-
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
327-
if (_publisherConfirmationTrackingEnabled)
328-
{
329-
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
330-
if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs))
360+
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
361+
if (_publisherConfirmationTrackingEnabled)
331362
{
332-
if (confirmSemaphoreAcquired)
363+
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
364+
if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs))
333365
{
334-
_confirmSemaphore.Release();
366+
throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists.");
335367
}
336-
lease?.Dispose();
337-
throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists.");
338368
}
339-
}
340369

341-
_nextPublishSeqNo++;
370+
_nextPublishSeqNo++;
342371

343-
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
372+
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
373+
}
374+
else
375+
{
376+
return null;
377+
}
344378
}
345379

346380
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -369,28 +403,22 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
369403
private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmationInfo? publisherConfirmationInfo,
370404
CancellationToken cancellationToken)
371405
{
372-
if (publisherConfirmationInfo is null)
373-
{
374-
return;
375-
}
376-
377-
_confirmSemaphore.Release();
378-
379-
try
380-
{
381-
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
382-
.ConfigureAwait(false);
383-
}
384-
catch (OperationCanceledException)
385-
{
386-
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
387-
throw;
388-
}
389-
finally
406+
if (_publisherConfirmationsEnabled)
390407
{
391-
publisherConfirmationInfo.Dispose();
408+
if (publisherConfirmationInfo is not null)
409+
{
410+
try
411+
{
412+
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
413+
.ConfigureAwait(false);
414+
}
415+
catch (OperationCanceledException)
416+
{
417+
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
418+
throw;
419+
}
420+
}
392421
}
393-
394422
}
395423

396424
[MethodImpl(MethodImplOptions.AggressiveInlining)]

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -582,15 +582,13 @@ protected virtual void Dispose(bool disposing)
582582
_serverOriginatedChannelCloseTcs?.Task.Wait(InternalConstants.DefaultChannelDisposeTimeout);
583583

584584
ConsumerDispatcher.Dispose();
585-
586-
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
587585
}
588586
finally
589587
{
590588
try
591589
{
592590
_rpcSemaphore.Dispose();
593-
_confirmSemaphore.Dispose();
591+
_confirmLeaseFactory.Dispose();
594592
MaybeSetExceptionOnConfirmsTcs();
595593
}
596594
catch
@@ -642,19 +640,13 @@ await _serverOriginatedChannelCloseTcs.Task.WaitAsync(InternalConstants.DefaultC
642640
}
643641

644642
ConsumerDispatcher.Dispose();
645-
646-
if (_outstandingPublisherConfirmationsRateLimiter is not null)
647-
{
648-
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
649-
.ConfigureAwait(false);
650-
}
651643
}
652644
finally
653645
{
654646
try
655647
{
656648
_rpcSemaphore.Dispose();
657-
_confirmSemaphore.Dispose();
649+
_confirmLeaseFactory.Dispose();
658650
}
659651
catch
660652
{

0 commit comments

Comments
 (0)