Skip to content

Commit b773196

Browse files
committed
Propagate token into consumer
1 parent fa1dc4b commit b773196

File tree

5 files changed

+43
-30
lines changed

5 files changed

+43
-30
lines changed

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace RabbitMQ.Client
@@ -45,33 +46,36 @@ public string[] ConsumerTags
4546
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
4647
/// for use in acknowledging received messages, for instance.
4748
/// </summary>
48-
public IChannel Channel { get; private set; }
49+
public IChannel Channel { get; }
4950

5051
/// <summary>
5152
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
5253
/// e.g. the queue has been deleted (either by this channel or by any other channel).
5354
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
5455
/// </summary>
5556
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
56-
public virtual Task HandleBasicCancelAsync(string consumerTag)
57+
/// <param name="cancellationToken">The cancellation token.</param>
58+
public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default)
5759
{
58-
return OnCancel(consumerTag);
60+
return OnCancelAsync(new []{ consumerTag }, cancellationToken);
5961
}
6062

6163
/// <summary>
6264
/// Called upon successful deregistration of the consumer from the broker.
6365
/// </summary>
6466
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
65-
public virtual Task HandleBasicCancelOkAsync(string consumerTag)
67+
/// <param name="cancellationToken">The cancellation token.</param>
68+
public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default)
6669
{
67-
return OnCancel(consumerTag);
70+
return OnCancelAsync(new []{ consumerTag }, cancellationToken);
6871
}
6972

7073
/// <summary>
7174
/// Called upon successful registration of the consumer with the broker.
7275
/// </summary>
7376
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
74-
public virtual Task HandleBasicConsumeOkAsync(string consumerTag)
77+
/// <param name="cancellationToken">The cancellation token.</param>
78+
public virtual Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default)
7579
{
7680
_consumerTags.Add(consumerTag);
7781
IsRunning = true;
@@ -94,7 +98,8 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
9498
string exchange,
9599
string routingKey,
96100
IReadOnlyBasicProperties properties,
97-
ReadOnlyMemory<byte> body)
101+
ReadOnlyMemory<byte> body,
102+
CancellationToken cancellationToken = default)
98103
{
99104
// Nothing to do here.
100105
return Task.CompletedTask;
@@ -108,16 +113,17 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
108113
public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason)
109114
{
110115
ShutdownReason = reason;
111-
return OnCancel(_consumerTags.ToArray());
116+
return OnCancelAsync(ConsumerTags, reason.CancellationToken);
112117
}
113118

114119
/// <summary>
115120
/// Default implementation - overridable in subclasses.</summary>
116121
/// <param name="consumerTags">The set of consumer tags that where cancelled</param>
122+
/// <param name="cancellationToken">The cancellation token.</param>
117123
/// <remarks>
118124
/// This default implementation simply sets the <see cref="IsRunning"/> property to false, and takes no further action.
119125
/// </remarks>
120-
public virtual Task OnCancel(params string[] consumerTags)
126+
protected virtual Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
121127
{
122128
IsRunning = false;
123129
foreach (string consumerTag in consumerTags)

projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace RabbitMQ.Client
@@ -20,19 +21,22 @@ public interface IAsyncBasicConsumer
2021
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
2122
/// </summary>
2223
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
23-
Task HandleBasicCancelAsync(string consumerTag);
24+
/// <param name="cancellationToken">The cancellation token.</param>
25+
Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default);
2426

2527
/// <summary>
2628
/// Called upon successful deregistration of the consumer from the broker.
2729
/// </summary>
2830
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
29-
Task HandleBasicCancelOkAsync(string consumerTag);
31+
/// <param name="cancellationToken">The cancellation token.</param>
32+
Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default);
3033

3134
/// <summary>
3235
/// Called upon successful registration of the consumer with the broker.
3336
/// </summary>
3437
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
35-
Task HandleBasicConsumeOkAsync(string consumerTag);
38+
/// <param name="cancellationToken">The cancellation token.</param>
39+
Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default);
3640

3741
/// <summary>
3842
/// Called each time a message arrives for this consumer.
@@ -44,7 +48,7 @@ public interface IAsyncBasicConsumer
4448
/// </para>
4549
/// <para>
4650
/// NOTE: Using the <c>body</c> outside of
47-
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte})"/></c>
51+
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte}, CancellationToken)"/></c>
4852
/// requires that it be copied!
4953
/// </para>
5054
/// </remarks>
@@ -55,7 +59,8 @@ Task HandleBasicDeliverAsync(string consumerTag,
5559
string exchange,
5660
string routingKey,
5761
IReadOnlyBasicProperties properties,
58-
ReadOnlyMemory<byte> body);
62+
ReadOnlyMemory<byte> body,
63+
CancellationToken cancellationToken = default);
5964

6065
/// <summary>
6166
/// Called when the channel shuts down.

projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using RabbitMQ.Client.Impl;
45

@@ -51,37 +52,37 @@ public event AsyncEventHandler<ConsumerEventArgs> Unregistered
5152
private AsyncEventingWrapper<ConsumerEventArgs> _unregisteredWrapper;
5253

5354
///<summary>Fires when the server confirms successful consumer cancellation.</summary>
54-
public override async Task OnCancel(params string[] consumerTags)
55+
protected override async Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
5556
{
56-
await base.OnCancel(consumerTags)
57+
await base.OnCancelAsync(consumerTags, cancellationToken)
5758
.ConfigureAwait(false);
5859
if (!_unregisteredWrapper.IsEmpty)
5960
{
60-
await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags))
61+
await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags, cancellationToken), cancellationToken)
6162
.ConfigureAwait(false);
6263
}
6364
}
6465

6566
///<summary>Fires when the server confirms successful consumer registration.</summary>
66-
public override async Task HandleBasicConsumeOkAsync(string consumerTag)
67+
public override async Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default)
6768
{
68-
await base.HandleBasicConsumeOkAsync(consumerTag)
69+
await base.HandleBasicConsumeOkAsync(consumerTag, cancellationToken)
6970
.ConfigureAwait(false);
7071
if (!_registeredWrapper.IsEmpty)
7172
{
72-
await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }))
73+
await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }, cancellationToken), cancellationToken)
7374
.ConfigureAwait(false);
7475
}
7576
}
7677

7778
///<summary>Fires the Received event.</summary>
7879
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
79-
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
80+
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
8081
{
81-
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
82+
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body, cancellationToken);
8283

8384
// No need to call base, it's empty.
84-
return _receivedWrapper.InvokeAsync(this, deliverEventArgs);
85+
return _receivedWrapper.InvokeAsync(this, deliverEventArgs, cancellationToken);
8586
}
8687

8788
///<summary>Fires the Shutdown event.</summary>
@@ -91,7 +92,7 @@ await base.HandleChannelShutdownAsync(channel, reason)
9192
.ConfigureAwait(false);
9293
if (!_shutdownWrapper.IsEmpty)
9394
{
94-
await _shutdownWrapper.InvokeAsync(this, reason)
95+
await _shutdownWrapper.InvokeAsync(this, reason, reason.CancellationToken)
9596
.ConfigureAwait(false);
9697
}
9798
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using RabbitMQ.Client.Logging;
45

@@ -8,26 +9,26 @@ internal sealed class FallbackConsumer : IAsyncBasicConsumer
89
{
910
public IChannel? Channel { get; } = null;
1011

11-
Task IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag)
12+
Task IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
1213
{
1314
ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicCancelAsync)} for tag {consumerTag}");
1415
return Task.CompletedTask;
1516
}
1617

17-
Task IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag)
18+
Task IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
1819
{
1920
ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicCancelOkAsync)} for tag {consumerTag}");
2021
return Task.CompletedTask;
2122
}
2223

23-
Task IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag)
24+
Task IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken)
2425
{
2526
ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicConsumeOkAsync)} for tag {consumerTag}");
2627
return Task.CompletedTask;
2728
}
2829

2930
Task IAsyncBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
30-
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
31+
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
3132
{
3233
ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicDeliverAsync)} for tag {consumerTag}");
3334
return Task.CompletedTask;

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -811,10 +811,10 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg
811811
return base.HandleChannelShutdownAsync(channel, reason);
812812
}
813813

814-
public override Task OnCancel(params string[] consumerTags)
814+
public override Task OnCancelAsync(params string[] consumerTags)
815815
{
816816
_output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]);
817-
return base.OnCancel(consumerTags);
817+
return base.OnCancelAsync(consumerTags);
818818
}
819819
}
820820
}

0 commit comments

Comments
 (0)