Skip to content

Commit 1fa0562

Browse files
authored
* Add more use of CancellationToken in Async methods. (#1468)
* Correctly dispose of `CancellationTokenSource` and `CancellationTokenRegistration` instances. * Refactor `Connection.Close` to use async internally. * Fix test by adding `WaitAsync` that also takes a timeout. * Add `ConfigureAwait` where it was missing. * Always create `CancellationTokenSource` for recovery, and dispose it. * Modify `WaitAsync` `Task` extension to see if `Task` has already completed. * Don't swallow exceptions unless `abort` is specified. * Add `TaskCreationOptions` to two spots. * Add `SetSessionClosingAsync` * Use `CancellationToken` to stop receieve loop. * Pass the main loop `CancellationToken` into `HardProtocolExceptionHandlerAsync`. * Pass `CancellationToken` to `IFrameHandler.CloseAsync`. * Remove remaining usage of `ThreadPool`
1 parent 220f5a5 commit 1fa0562

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+949
-710
lines changed

projects/RabbitMQ.Client/FrameworkExtension/Interlocked.cs

Lines changed: 0 additions & 29 deletions
This file was deleted.

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
918918
~RabbitMQ.Client.IChannel.TxCommitAsync() -> System.Threading.Tasks.Task
919919
~RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.Task
920920
~RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.Task
921-
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort) -> System.Threading.Tasks.Task
921+
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
922922
~RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
923923
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason) -> System.Threading.Tasks.Task
924924
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>

projects/RabbitMQ.Client/client/TaskExtensions.cs

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,67 @@ public static bool IsCompletedSuccessfully(this Task task)
5353
private static readonly TaskContinuationOptions s_tco = TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously;
5454
private static void IgnoreTaskContinuation(Task t, object s) => t.Exception.Handle(e => true);
5555

56-
public static async Task WithCancellation(this Task task, CancellationToken cancellationToken)
56+
// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
57+
public static Task WaitAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
5758
{
58-
var tcs = new TaskCompletionSource<bool>();
59+
if (task.IsCompletedSuccessfully())
60+
{
61+
return task;
62+
}
63+
else
64+
{
65+
return DoWaitWithTimeoutAsync(task, timeout, cancellationToken);
66+
}
67+
}
68+
69+
private static async Task DoWaitWithTimeoutAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
70+
{
71+
using var timeoutTokenCts = new CancellationTokenSource(timeout);
72+
CancellationToken timeoutToken = timeoutTokenCts.Token;
73+
74+
var linkedTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
75+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken, cancellationToken);
76+
using CancellationTokenRegistration cancellationTokenRegistration =
77+
linkedCts.Token.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
78+
state: linkedTokenTcs, useSynchronizationContext: false);
79+
80+
if (task != await Task.WhenAny(task, linkedTokenTcs.Task).ConfigureAwait(false))
81+
{
82+
task.Ignore();
83+
if (timeoutToken.IsCancellationRequested)
84+
{
85+
throw new OperationCanceledException($"Operation timed out after {timeout}");
86+
}
87+
else
88+
{
89+
throw new OperationCanceledException(cancellationToken);
90+
}
91+
}
5992

60-
// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
61-
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
93+
await task.ConfigureAwait(false);
94+
}
95+
96+
// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
97+
public static Task WaitAsync(this Task task, CancellationToken cancellationToken)
98+
{
99+
if (task.IsCompletedSuccessfully())
62100
{
63-
if (task != await Task.WhenAny(task, tcs.Task))
101+
return task;
102+
}
103+
else
104+
{
105+
return DoWaitAsync(task, cancellationToken);
106+
}
107+
}
108+
109+
private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
110+
{
111+
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
112+
113+
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
114+
state: cancellationTokenTcs, useSynchronizationContext: false))
115+
{
116+
if (task != await Task.WhenAny(task, cancellationTokenTcs.Task).ConfigureAwait(false))
64117
{
65118
task.Ignore();
66119
throw new OperationCanceledException(cancellationToken);
@@ -172,10 +225,13 @@ public static T EnsureCompleted<T>(this ValueTask<T> task)
172225

173226
public static void EnsureCompleted(this ValueTask task)
174227
{
175-
task.GetAwaiter().GetResult();
228+
if (false == task.IsCompletedSuccessfully)
229+
{
230+
task.GetAwaiter().GetResult();
231+
}
176232
}
177233

178-
#if !NET6_0_OR_GREATER
234+
#if NETSTANDARD
179235
// https://github.com/dotnet/runtime/issues/23878
180236
// https://github.com/dotnet/runtime/issues/23878#issuecomment-1398958645
181237
public static void Ignore(this Task task)

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
618618
internal async Task<IFrameHandler> CreateFrameHandlerAsync(
619619
AmqpTcpEndpoint endpoint, CancellationToken cancellationToken)
620620
{
621+
cancellationToken.ThrowIfCancellationRequested();
621622
IFrameHandler fh = new SocketFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
622623
await fh.ConnectAsync(cancellationToken)
623624
.ConfigureAwait(false);

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.Events;
3637
using RabbitMQ.Client.Exceptions;
@@ -222,9 +223,10 @@ public interface IConnection : INetworkConnection, IDisposable
222223
/// </summary>
223224
/// <param name="reasonCode">The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).</param>
224225
/// <param name="reasonText">A message indicating the reason for closing the connection.</param>
225-
/// <param name="timeout">Operation timeout.</param>
226+
/// <param name="timeout"></param>
226227
/// <param name="abort">Whether or not this close is an abort (ignores certain exceptions).</param>
227-
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort);
228+
/// <param name="cancellationToken"></param>
229+
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default);
228230

229231
/// <summary>
230232
/// Asynchronously create and return a fresh channel, session, and channel.

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public static class IConnectionExtensions
2020
/// </remarks>
2121
public static Task CloseAsync(this IConnection connection)
2222
{
23-
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", InternalConstants.DefaultConnectionCloseTimeout, false);
23+
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", InternalConstants.DefaultConnectionCloseTimeout, false,
24+
CancellationToken.None);
2425
}
2526

2627
/// <summary>
@@ -38,7 +39,8 @@ public static Task CloseAsync(this IConnection connection)
3839
/// </remarks>
3940
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText)
4041
{
41-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionCloseTimeout, false);
42+
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionCloseTimeout, false,
43+
CancellationToken.None);
4244
}
4345

4446
/// <summary>
@@ -58,7 +60,8 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
5860
/// </remarks>
5961
public static Task CloseAsync(this IConnection connection, TimeSpan timeout)
6062
{
61-
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false);
63+
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false,
64+
CancellationToken.None);
6265
}
6366

6467
/// <summary>
@@ -80,7 +83,8 @@ public static Task CloseAsync(this IConnection connection, TimeSpan timeout)
8083
/// </remarks>
8184
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
8285
{
83-
return connection.CloseAsync(reasonCode, reasonText, timeout, false);
86+
return connection.CloseAsync(reasonCode, reasonText, timeout, false,
87+
CancellationToken.None);
8488
}
8589

8690
/// <summary>
@@ -94,7 +98,8 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
9498
/// </remarks>
9599
public static Task AbortAsync(this IConnection connection)
96100
{
97-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true);
101+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
102+
CancellationToken.None);
98103
}
99104

100105
/// <summary>
@@ -112,7 +117,8 @@ public static Task AbortAsync(this IConnection connection)
112117
/// </remarks>
113118
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
114119
{
115-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true);
120+
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
121+
CancellationToken.None);
116122
}
117123

118124
/// <summary>
@@ -130,7 +136,8 @@ public static Task AbortAsync(this IConnection connection, ushort reasonCode, st
130136
/// </remarks>
131137
public static Task AbortAsync(this IConnection connection, TimeSpan timeout)
132138
{
133-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true);
139+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true,
140+
CancellationToken.None);
134141
}
135142

136143
/// <summary>
@@ -149,7 +156,8 @@ public static Task AbortAsync(this IConnection connection, TimeSpan timeout)
149156
/// </remarks>
150157
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
151158
{
152-
return connection.CloseAsync(reasonCode, reasonText, timeout, true);
159+
return connection.CloseAsync(reasonCode, reasonText, timeout, true,
160+
CancellationToken.None);
153161
}
154162
}
155163
}

projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
4545
var exceptions = new List<Exception>();
4646
foreach (AmqpTcpEndpoint ep in resolver.All())
4747
{
48+
cancellationToken.ThrowIfCancellationRequested();
4849
try
4950
{
5051
t = await selector(ep, cancellationToken).ConfigureAwait(false);

projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public virtual Task ConnectAsync(IPAddress ep, int port, CancellationToken cance
2929
#else
3030
public virtual Task ConnectAsync(IPAddress ep, int port, CancellationToken cancellationToken = default)
3131
{
32-
return _sock.ConnectAsync(ep, port).WithCancellation(cancellationToken);
32+
return _sock.ConnectAsync(ep, port).WaitAsync(cancellationToken);
3333
}
3434
#endif
3535

projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
9292
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
9393
using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
9494
{
95-
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
95+
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
96+
.ConfigureAwait(false);
9697
Received?.Invoke(this, eventArgs);
9798
}
9899
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System.Collections.Generic;
33+
using System.Threading;
3234
using System.Threading.Tasks;
3335
using RabbitMQ.Client.client.framing;
3436
using RabbitMQ.Client.Impl;
@@ -69,19 +71,22 @@ public override void _Private_ConnectionCloseOk()
6971
public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple)
7072
{
7173
var method = new BasicAck(deliveryTag, multiple);
72-
return ModelSendAsync(method);
74+
// TODO cancellation token?
75+
return ModelSendAsync(method, CancellationToken.None);
7376
}
7477

7578
public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
7679
{
7780
var method = new BasicNack(deliveryTag, multiple, requeue);
78-
return ModelSendAsync(method);
81+
// TODO use cancellation token
82+
return ModelSendAsync(method, CancellationToken.None);
7983
}
8084

8185
public override Task BasicRejectAsync(ulong deliveryTag, bool requeue)
8286
{
8387
var method = new BasicReject(deliveryTag, requeue);
84-
return ModelSendAsync(method).AsTask();
88+
// TODO cancellation token?
89+
return ModelSendAsync(method, CancellationToken.None).AsTask();
8590
}
8691

8792
protected override bool DispatchAsynchronous(in IncomingCommand cmd)

0 commit comments

Comments
 (0)