Skip to content

Commit f2cf79a

Browse files
Add cancellation token overload to channel extensions (#1641)
Co-authored-by: Daniel Marbach <[email protected]>
1 parent f6847f9 commit f2cf79a

File tree

5 files changed

+56
-52
lines changed

5 files changed

+56
-52
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -808,9 +808,6 @@ static RabbitMQ.Client.EndpointResolverExtensions.SelectOneAsync<T>(this RabbitM
808808
static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs
809809
static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context, object consumer) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs
810810
static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection<string>
811-
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
812-
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
813-
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
814811
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
815812
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
816813
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string
@@ -897,25 +894,28 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
897894
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
898895
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
899896
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
900-
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IAsyncBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task<string>
901-
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
902-
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
903-
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
904897
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
905-
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
906-
~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
907-
~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
908-
~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task<uint>
909-
~static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel channel, string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task
910-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task
911-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
912898
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
913-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
914-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
915-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
916-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
917899
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
918900
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
919901
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
920902
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
921903
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
904+
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.IAsyncBasicConsumer! consumer, string! queue, bool autoAck = false, string! consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
905+
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
906+
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
907+
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary<string!, object?>? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
908+
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
909+
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
910+
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
911+
static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
912+
static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
913+
static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk!>!
914+
static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool ifUnused = false, bool ifEmpty = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>!
915+
static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel! channel, string! queue, string! exchange, string! routingKey, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
916+
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task!
917+
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
918+
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
919+
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
920+
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
921+
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,36 +48,40 @@ public static Task<string> BasicConsumeAsync(this IChannel channel,
4848
string consumerTag = "",
4949
bool noLocal = false,
5050
bool exclusive = false,
51-
IDictionary<string, object?>? arguments = null)
51+
IDictionary<string, object?>? arguments = null,
52+
CancellationToken cancellationToken = default)
5253
{
53-
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
54+
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer, cancellationToken);
5455
}
5556

5657
/// <summary>Asynchronously start a Basic content-class consumer.</summary>
5758
public static Task<string> BasicConsumeAsync(this IChannel channel, string queue,
5859
bool autoAck,
59-
IAsyncBasicConsumer consumer)
60+
IAsyncBasicConsumer consumer,
61+
CancellationToken cancellationToken = default)
6062
{
61-
return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer);
63+
return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer, cancellationToken);
6264
}
6365

6466
/// <summary>Asynchronously start a Basic content-class consumer.</summary>
6567
public static Task<string> BasicConsumeAsync(this IChannel channel, string queue,
6668
bool autoAck,
6769
string consumerTag,
68-
IAsyncBasicConsumer consumer)
70+
IAsyncBasicConsumer consumer,
71+
CancellationToken cancellationToken = default)
6972
{
70-
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer);
73+
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer, cancellationToken);
7174
}
7275

7376
/// <summary>Asynchronously start a Basic content-class consumer.</summary>
7477
public static Task<string> BasicConsumeAsync(this IChannel channel, string queue,
7578
bool autoAck,
7679
string consumerTag,
7780
IDictionary<string, object?>? arguments,
78-
IAsyncBasicConsumer consumer)
81+
IAsyncBasicConsumer consumer,
82+
CancellationToken cancellationToken = default)
7983
{
80-
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer);
84+
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer, cancellationToken);
8185
}
8286

8387
/// <summary>
@@ -87,55 +91,55 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
8791
/// The publication occurs with mandatory=false and immediate=false.
8892
/// </remarks>
8993
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, T basicProperties,
90-
ReadOnlyMemory<byte> body)
94+
ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
9195
where T : IReadOnlyBasicProperties, IAmqpHeader
9296
{
93-
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
97+
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body, false, cancellationToken);
9498
}
9599

96100
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
97-
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
98-
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
101+
ReadOnlyMemory<byte> body = default, bool mandatory = false, CancellationToken cancellationToken = default) =>
102+
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken);
99103

100104
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
101-
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
102-
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
105+
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, CancellationToken cancellationToken = default) =>
106+
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken);
103107

104108
/// <summary>
105109
/// Asynchronously declare a queue.
106110
/// </summary>
107111
public static Task<QueueDeclareOk> QueueDeclareAsync(this IChannel channel, string queue = "", bool durable = false, bool exclusive = true,
108-
bool autoDelete = true, IDictionary<string, object?>? arguments = null, bool noWait = false)
112+
bool autoDelete = true, IDictionary<string, object?>? arguments = null, bool noWait = false, CancellationToken cancellationToken = default)
109113
{
110114
return channel.QueueDeclareAsync(queue: queue, passive: false,
111115
durable: durable, exclusive: exclusive, autoDelete: autoDelete,
112-
arguments: arguments, noWait: noWait);
116+
arguments: arguments, noWait: noWait, cancellationToken: cancellationToken);
113117
}
114118

115119
/// <summary>
116120
/// Asynchronously declare an exchange.
117121
/// </summary>
118122
public static Task ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false,
119-
IDictionary<string, object?>? arguments = null, bool noWait = false)
123+
IDictionary<string, object?>? arguments = null, bool noWait = false, CancellationToken cancellationToken = default)
120124
{
121125
return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete,
122-
arguments: arguments, passive: false, noWait: noWait);
126+
arguments: arguments, passive: false, noWait: noWait, cancellationToken: cancellationToken);
123127
}
124128

125129
/// <summary>
126130
/// Asynchronously deletes a queue.
127131
/// </summary>
128-
public static Task<uint> QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false)
132+
public static Task<uint> QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false, CancellationToken cancellationToken = default)
129133
{
130-
return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty);
134+
return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty, false, cancellationToken);
131135
}
132136

133137
/// <summary>
134138
/// Asynchronously unbinds a queue.
135139
/// </summary>
136-
public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary<string, object?>? arguments = null)
140+
public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary<string, object?>? arguments = null, CancellationToken cancellationToken = default)
137141
{
138-
return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments);
142+
return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken);
139143
}
140144

141145
/// <summary>

0 commit comments

Comments
 (0)