Skip to content

Commit 8463aba

Browse files
committed
* Reproduce issue by artifically never acquiring rate limiter leases.
* Fix issue by catching `SemaphoreFullException` and checking that the exception was thrown in a valid failure case.
1 parent 78bb803 commit 8463aba

File tree

3 files changed

+60
-31
lines changed

3 files changed

+60
-31
lines changed

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,24 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
344344
{
345345
if (_publisherConfirmationsEnabled)
346346
{
347-
_confirmSemaphore.Release();
347+
try
348+
{
349+
_confirmSemaphore.Release();
350+
}
351+
catch (SemaphoreFullException ex)
352+
{
353+
/*
354+
* rabbitmq/rabbitmq-dotnet-client-1793
355+
* If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring
356+
* _confirmSemaphore, the above Release() call will throw SemaphoreFullException.
357+
* In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw
358+
* a "bug found" exception here.
359+
*/
360+
if (publisherConfirmationInfo is not null)
361+
{
362+
throw new InvalidOperationException(InternalConstants.BugFound, ex);
363+
}
364+
}
348365

349366
if (publisherConfirmationInfo is not null)
350367
{

projects/Test/Integration/GH/ThrowsExceptionRateLimiter.cs renamed to projects/Test/Integration/GH/NeverAcquiredRateLimiter.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,39 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Threading;
3435
using System.Threading.RateLimiting;
3536
using System.Threading.Tasks;
3637

3738
namespace Integration.GH
3839
{
39-
public class ThrowsExceptionRateLimiter : RateLimiter
40+
public class NeverAcquiredRateLimiter : RateLimiter
4041
{
4142
public override TimeSpan? IdleDuration => throw new NotImplementedException();
4243
public override RateLimiterStatistics GetStatistics() => throw new NotImplementedException();
44+
4345
protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
44-
=> throw new NotImplementedException();
46+
{
47+
return new ValueTask<RateLimitLease>(new NotAcquiredRateLimitLease());
48+
}
49+
4550
protected override RateLimitLease AttemptAcquireCore(int permitCount)
46-
=> throw new NotImplementedException();
51+
{
52+
return new NotAcquiredRateLimitLease();
53+
}
54+
}
55+
56+
public class NotAcquiredRateLimitLease : RateLimitLease
57+
{
58+
public override bool IsAcquired => false;
59+
60+
public override IEnumerable<string> MetadataNames => [];
61+
62+
public override bool TryGetMetadata(string metadataName, out object metadata)
63+
{
64+
metadata = string.Empty;
65+
return true;
66+
}
4767
}
4868
}

projects/Test/Integration/GH/TestGitHubIssues.cs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,7 @@ public async Task MaybeSomethingUpWithRateLimiter_GH1793()
346346
var channelOpts = new CreateChannelOptions(
347347
publisherConfirmationsEnabled: true,
348348
publisherConfirmationTrackingEnabled: true,
349-
// outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(messageCount)
350-
outstandingPublisherConfirmationsRateLimiter: new ThrowsExceptionRateLimiter()
349+
outstandingPublisherConfirmationsRateLimiter: new NeverAcquiredRateLimiter()
351350
);
352351

353352
_channel = await _conn.CreateChannelAsync(channelOpts);
@@ -357,39 +356,32 @@ public async Task MaybeSomethingUpWithRateLimiter_GH1793()
357356
DeliveryMode = DeliveryModes.Persistent
358357
};
359358

360-
async Task PublishMessagesAsync()
359+
for (int i = 0; i < messageCount; i++)
361360
{
362-
for (int i = 0; i < messageCount; i++)
361+
int retryCount = 0;
362+
const int maxRetries = 3;
363+
while (retryCount <= maxRetries)
363364
{
364-
int retryCount = 0;
365-
const int maxRetries = 3;
366-
while (retryCount <= maxRetries)
365+
try
367366
{
368-
try
367+
byte[] bytes = Encoding.UTF8.GetBytes("message");
368+
await Assert.ThrowsAnyAsync<InvalidOperationException>(async () =>
369369
{
370-
byte[] bytes = Encoding.UTF8.GetBytes("message");
371370
await _channel.BasicPublishAsync(string.Empty, string.Empty, true, properties, bytes);
372-
break;
373-
}
374-
catch (SemaphoreFullException ex0)
375-
{
376-
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex0);
377-
retryCount++;
378-
}
379-
catch (PublishException)
380-
{
381-
retryCount++;
382-
}
371+
});
372+
break;
373+
}
374+
catch (SemaphoreFullException ex0)
375+
{
376+
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex0);
377+
retryCount++;
378+
}
379+
catch (PublishException)
380+
{
381+
retryCount++;
383382
}
384383
}
385384
}
386-
387-
var publishTasks = new List<Task>();
388-
for (int i = 0; i < messageCount; i++)
389-
{
390-
publishTasks.Add(Task.Run(PublishMessagesAsync));
391-
}
392-
await Task.WhenAll(publishTasks);
393385
}
394386
}
395387
}

0 commit comments

Comments
 (0)