Skip to content

Commit 27de6fd

Browse files
author
Joakim Andersson
committed
fix: properly cancel token in AsyncEventingBasicConsumer on channel close
1 parent a8dcbc9 commit 27de6fd

File tree

4 files changed

+12
-2
lines changed

4 files changed

+12
-2
lines changed

projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ protected override async Task ProcessChannelAsync()
3333
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
3434
{
3535
await work.Consumer.HandleBasicDeliverAsync(
36-
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
37-
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
36+
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
37+
work.Exchange!, work.RoutingKey!, work.BasicProperties!,
38+
work.Body.Memory,
39+
work.Consumer.Channel?.ShutdownCts.Token ?? default)
3840
.ConfigureAwait(false);
3941
}
4042
break;

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public interface IChannel : IAsyncDisposable, IDisposable
5757
/// or the cause of its closure otherwise.
5858
/// </summary>
5959
ShutdownEventArgs? CloseReason { get; }
60+
61+
CancellationTokenSource ShutdownCts { get; }
6062

6163
/// <summary>Signalled when an unexpected message is delivered.</summary>
6264
///

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public IEnumerable<string> ConsumerTags
136136
public int ChannelNumber => InnerChannel.ChannelNumber;
137137

138138
public ShutdownEventArgs? CloseReason => InnerChannel.CloseReason;
139+
public CancellationTokenSource ShutdownCts => InnerChannel.ShutdownCts;
139140

140141
public IAsyncBasicConsumer? DefaultConsumer
141142
{

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ internal partial class Channel : IChannel, IRecoverable
6666
private bool _disposed;
6767
private int _isDisposing;
6868

69+
private CancellationTokenSource _shutdownCts = new CancellationTokenSource();
70+
public CancellationTokenSource ShutdownCts => _shutdownCts;
71+
6972
public Channel(ISession session, CreateChannelOptions createChannelOptions)
7073
{
7174
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
@@ -208,6 +211,8 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
208211
public async Task CloseAsync(ShutdownEventArgs args, bool abort,
209212
CancellationToken cancellationToken)
210213
{
214+
_shutdownCts.Cancel();
215+
211216
bool enqueued = false;
212217
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
213218

0 commit comments

Comments
 (0)