Skip to content

Commit 0f4dfbb

Browse files
committed
* Ignore OperationCanceledException in consumer dispatchers if cancellation is requested
1 parent 1e42d00 commit 0f4dfbb

File tree

2 files changed

+68
-48
lines changed

2 files changed

+68
-48
lines changed

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
1616

1717
protected override async Task ProcessChannelAsync(CancellationToken token)
1818
{
19-
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
19+
try
2020
{
21-
while (_reader.TryRead(out WorkStruct work))
21+
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
2222
{
23-
using (work)
23+
while (_reader.TryRead(out WorkStruct work))
2424
{
25-
try
25+
using (work)
2626
{
27-
Task task = work.WorkType switch
27+
try
2828
{
29-
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
30-
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
31-
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
29+
Task task = work.WorkType switch
30+
{
31+
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
32+
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
33+
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
3234

33-
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
35+
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
3436

35-
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
37+
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
3638

37-
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
39+
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
3840

39-
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
41+
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
4042

41-
_ => Task.CompletedTask
42-
};
43-
await task.ConfigureAwait(false);
44-
}
45-
catch (Exception e)
46-
{
47-
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
43+
_ => Task.CompletedTask
44+
};
45+
await task.ConfigureAwait(false);
46+
}
47+
catch (Exception e)
48+
{
49+
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
50+
}
4851
}
4952
}
5053
}
5154
}
55+
catch (OperationCanceledException)
56+
{
57+
if (false == token.IsCancellationRequested)
58+
{
59+
throw;
60+
}
61+
}
5262
}
5363
}
5464
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)
1616

1717
protected override async Task ProcessChannelAsync(CancellationToken token)
1818
{
19-
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
19+
try
2020
{
21-
while (_reader.TryRead(out var work))
21+
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
2222
{
23-
using (work)
23+
while (_reader.TryRead(out WorkStruct work))
2424
{
25-
try
25+
using (work)
2626
{
27-
IBasicConsumer consumer = work.Consumer;
28-
string? consumerTag = work.ConsumerTag;
29-
switch (work.WorkType)
27+
try
3028
{
31-
case WorkType.Deliver:
32-
await consumer.HandleBasicDeliverAsync(
33-
consumerTag, work.DeliveryTag, work.Redelivered,
34-
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
35-
.ConfigureAwait(false);
36-
break;
37-
case WorkType.Cancel:
38-
consumer.HandleBasicCancel(consumerTag);
39-
break;
40-
case WorkType.CancelOk:
41-
consumer.HandleBasicCancelOk(consumerTag);
42-
break;
43-
case WorkType.ConsumeOk:
44-
consumer.HandleBasicConsumeOk(consumerTag);
45-
break;
46-
case WorkType.Shutdown:
47-
consumer.HandleChannelShutdown(_channel, work.Reason);
48-
break;
29+
IBasicConsumer consumer = work.Consumer;
30+
string? consumerTag = work.ConsumerTag;
31+
switch (work.WorkType)
32+
{
33+
case WorkType.Deliver:
34+
await consumer.HandleBasicDeliverAsync(
35+
consumerTag, work.DeliveryTag, work.Redelivered,
36+
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
37+
.ConfigureAwait(false);
38+
break;
39+
case WorkType.Cancel:
40+
consumer.HandleBasicCancel(consumerTag);
41+
break;
42+
case WorkType.CancelOk:
43+
consumer.HandleBasicCancelOk(consumerTag);
44+
break;
45+
case WorkType.ConsumeOk:
46+
consumer.HandleBasicConsumeOk(consumerTag);
47+
break;
48+
case WorkType.Shutdown:
49+
consumer.HandleChannelShutdown(_channel, work.Reason);
50+
break;
51+
}
52+
}
53+
catch (Exception e)
54+
{
55+
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
4956
}
50-
}
51-
catch (Exception e)
52-
{
53-
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
5457
}
5558
}
5659
}
5760
}
61+
catch (OperationCanceledException)
62+
{
63+
if (false == token.IsCancellationRequested)
64+
{
65+
throw;
66+
}
67+
}
5868
}
5969
}
6070
}

0 commit comments

Comments
 (0)