Skip to content

Commit a6b7e38

Browse files
committed
Allow semaphore to be awaited and cancelled
1 parent 1896a62 commit a6b7e38

File tree

3 files changed

+14
-11
lines changed

3 files changed

+14
-11
lines changed

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,11 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
8989
}
9090
case ProtocolCommandId.BasicAck:
9191
{
92-
HandleBasicAck(cmd);
93-
return Task.FromResult(true);
92+
return HandleBasicAck(cmd, cancellationToken);
9493
}
9594
case ProtocolCommandId.BasicNack:
9695
{
97-
HandleBasicNack(cmd);
98-
return Task.FromResult(true);
96+
return HandleBasicNack(cmd, cancellationToken);
9997
}
10098
case ProtocolCommandId.BasicReturn:
10199
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
589589
return ModelSendAsync(method, cancellationToken).AsTask();
590590
}
591591

592-
protected void HandleBasicAck(IncomingCommand cmd)
592+
protected async Task<bool> HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken)
593593
{
594594
var ack = new BasicAck(cmd.MethodSpan);
595595
if (!_basicAcksWrapper.IsEmpty)
@@ -598,10 +598,12 @@ protected void HandleBasicAck(IncomingCommand cmd)
598598
_basicAcksWrapper.Invoke(this, args);
599599
}
600600

601-
HandleAckNack(ack._deliveryTag, ack._multiple, false);
601+
await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken)
602+
.ConfigureAwait(false);
603+
return true;
602604
}
603605

604-
protected void HandleBasicNack(IncomingCommand cmd)
606+
protected async Task<bool> HandleBasicNack(IncomingCommand cmd, CancellationToken cancellationToken)
605607
{
606608
var nack = new BasicNack(cmd.MethodSpan);
607609
if (!_basicNacksWrapper.IsEmpty)
@@ -611,7 +613,9 @@ protected void HandleBasicNack(IncomingCommand cmd)
611613
_basicNacksWrapper.Invoke(this, args);
612614
}
613615

614-
HandleAckNack(nack._deliveryTag, nack._multiple, true);
616+
await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken)
617+
.ConfigureAwait(false);
618+
return true;
615619
}
616620

617621
protected async Task<bool> HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken)
@@ -1829,7 +1833,7 @@ await tokenRegistration.DisposeAsync()
18291833

18301834
// NOTE: this method is internal for its use in this test:
18311835
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
1832-
internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
1836+
internal async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)
18331837
{
18341838
// Only do this if confirms are enabled *and* the library is tracking confirmations
18351839
if (ConfirmsAreEnabled && _trackConfirmations)
@@ -1839,7 +1843,8 @@ internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
18391843
throw new InvalidOperationException(InternalConstants.BugFound);
18401844
}
18411845
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
1842-
_confirmSemaphore.Wait();
1846+
await _confirmSemaphore.WaitAsync(cancellationToken)
1847+
.ConfigureAwait(false);
18431848
try
18441849
{
18451850
// No need to do anything if there are no delivery tags in the list

projects/Test/Integration/TestPublisherConfirms.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout
105105
return TestWaitForConfirmsAsync(2000, async (ch) =>
106106
{
107107
RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel;
108-
actualChannel.HandleAckNack(10UL, false, true);
108+
await actualChannel.HandleAckNack(10UL, false, true);
109109
using (var cts = new CancellationTokenSource(ShortSpan))
110110
{
111111
Assert.False(await ch.WaitForConfirmsAsync(cts.Token));

0 commit comments

Comments
 (0)