Skip to content

Commit c682530

Browse files
committed
Pass cancellation through the dispatcher
1 parent 5a24514 commit c682530

File tree

2 files changed

+66
-24
lines changed

2 files changed

+66
-24
lines changed

projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,20 @@ protected override async Task ProcessChannelAsync()
3434
{
3535
await work.Consumer.HandleBasicDeliverAsync(
3636
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
37-
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
37+
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory, work.CancellationToken)
3838
.ConfigureAwait(false);
3939
}
4040
break;
4141
case WorkType.Cancel:
42-
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!)
42+
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!, work.CancellationToken)
4343
.ConfigureAwait(false);
4444
break;
4545
case WorkType.CancelOk:
46-
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!)
46+
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!, work.CancellationToken)
4747
.ConfigureAwait(false);
4848
break;
4949
case WorkType.ConsumeOk:
50-
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!)
50+
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!, work.CancellationToken)
5151
.ConfigureAwait(false);
5252
break;
5353
case WorkType.Shutdown:

projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
4646
private readonly ushort _concurrency;
4747
private long _isQuiescing;
4848
private bool _disposed;
49+
private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource();
4950

5051
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
5152
{
@@ -92,7 +93,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s
9293
try
9394
{
9495
AddConsumer(consumer, consumerTag);
95-
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
96+
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag, _shutdownCts);
9697
await _writer.WriteAsync(work, cancellationToken)
9798
.ConfigureAwait(false);
9899
}
@@ -113,7 +114,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
113114
if (false == _disposed && false == IsQuiescing)
114115
{
115116
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
116-
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
117+
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, _shutdownCts);
117118
await _writer.WriteAsync(work, cancellationToken)
118119
.ConfigureAwait(false);
119120
}
@@ -126,7 +127,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
126127
if (false == _disposed && false == IsQuiescing)
127128
{
128129
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
129-
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
130+
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag, _shutdownCts);
130131
await _writer.WriteAsync(work, cancellationToken)
131132
.ConfigureAwait(false);
132133
}
@@ -139,7 +140,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
139140
if (false == _disposed && false == IsQuiescing)
140141
{
141142
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
142-
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
143+
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag, _shutdownCts);
143144
await _writer.WriteAsync(work, cancellationToken)
144145
.ConfigureAwait(false);
145146
}
@@ -148,6 +149,14 @@ await _writer.WriteAsync(work, cancellationToken)
148149
public void Quiesce()
149150
{
150151
Interlocked.Exchange(ref _isQuiescing, 1);
152+
try
153+
{
154+
_shutdownCts.Cancel();
155+
}
156+
catch
157+
{
158+
// ignore
159+
}
151160
}
152161

153162
public async Task WaitForShutdownAsync()
@@ -214,7 +223,7 @@ protected bool IsQuiescing
214223

215224
protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
216225
{
217-
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
226+
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason, _shutdownCts));
218227
}
219228

220229
protected override Task InternalShutdownAsync()
@@ -237,25 +246,32 @@ protected override Task InternalShutdownAsync()
237246
public readonly RentedMemory Body;
238247
public readonly ShutdownEventArgs? Reason;
239248
public readonly WorkType WorkType;
249+
public readonly CancellationToken CancellationToken;
250+
private readonly CancellationTokenSource? _cancellationTokenSource;
240251

241-
private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag)
252+
private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
242253
: this()
243254
{
244255
WorkType = type;
245256
Consumer = consumer;
246257
ConsumerTag = consumerTag;
258+
CancellationToken = cancellationToken;
259+
_cancellationTokenSource = null;
247260
}
248261

249-
private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
262+
private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource? cancellationTokenSource)
250263
: this()
251264
{
252265
WorkType = WorkType.Shutdown;
253266
Consumer = consumer;
254267
Reason = reason;
268+
CancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;
269+
this._cancellationTokenSource = cancellationTokenSource;
255270
}
256271

257272
private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
258-
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
273+
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
274+
CancellationToken cancellationToken)
259275
{
260276
WorkType = WorkType.Deliver;
261277
Consumer = consumer;
@@ -266,37 +282,62 @@ private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliv
266282
RoutingKey = routingKey;
267283
BasicProperties = basicProperties;
268284
Body = body;
269-
Reason = default;
285+
Reason = null;
286+
CancellationToken = cancellationToken;
287+
_cancellationTokenSource = null;
270288
}
271289

272-
public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag)
290+
public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
273291
{
274-
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
292+
return new WorkStruct(WorkType.Cancel, consumer, consumerTag, cancellationTokenSource.Token);
275293
}
276294

277-
public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag)
295+
public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
278296
{
279-
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
297+
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag, cancellationTokenSource.Token);
280298
}
281299

282-
public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag)
300+
public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
283301
{
284-
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
302+
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag, cancellationTokenSource.Token);
285303
}
286304

287-
public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
305+
public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource cancellationTokenSource)
288306
{
289-
return new WorkStruct(consumer, reason);
307+
// Create a linked CTS so the shutdown args token reflects both dispatcher cancellation and any upstream token.
308+
CancellationTokenSource? linked = null;
309+
try
310+
{
311+
if (reason.CancellationToken.CanBeCanceled)
312+
{
313+
linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, reason.CancellationToken);
314+
}
315+
}
316+
catch
317+
{
318+
linked = null;
319+
}
320+
321+
CancellationToken token = linked?.Token ?? cancellationTokenSource.Token;
322+
ShutdownEventArgs argsWithToken = reason.Exception != null ?
323+
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.Exception, token) :
324+
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId, reason.Cause, token);
325+
326+
return new WorkStruct(consumer, argsWithToken, linked);
290327
}
291328

292329
public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
293-
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
330+
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationTokenSource cancellationTokenSource)
294331
{
295332
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
296-
exchange, routingKey, basicProperties, body);
333+
exchange, routingKey, basicProperties, body, cancellationTokenSource.Token);
297334
}
298335

299-
public void Dispose() => Body.Dispose();
336+
public void Dispose()
337+
{
338+
Body.Dispose();
339+
_cancellationTokenSource?.Dispose();
340+
}
300341
}
301342

302343
protected enum WorkType : byte
@@ -317,6 +358,7 @@ protected virtual void Dispose(bool disposing)
317358
if (disposing)
318359
{
319360
Quiesce();
361+
_shutdownCts.Dispose();
320362
}
321363
}
322364
catch

0 commit comments

Comments
 (0)