Skip to content

Commit 51743c0

Browse files
committed
Test connection shutdown propagation
1 parent 41e4d66 commit 51743c0

File tree

5 files changed

+60
-14
lines changed

5 files changed

+60
-14
lines changed

projects/RabbitMQ.Client/IConnectionExtensions.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,21 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
9393
/// </summary>
9494
/// <remarks>
9595
/// Note that all active channels and sessions will be closed if this method is called.
96-
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection)"/> will not throw
96+
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection, CancellationToken)"/> will not throw
9797
/// <see cref="IOException"/> during closing connection.
9898
///This method waits infinitely for the in-progress close operation to complete.
9999
/// </remarks>
100-
public static Task AbortAsync(this IConnection connection)
100+
public static Task AbortAsync(this IConnection connection, CancellationToken cancellationToken = default)
101101
{
102102
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
103-
CancellationToken.None);
103+
cancellationToken);
104104
}
105105

106106
/// <summary>
107107
/// Asynchronously abort this connection and all its channels.
108108
/// </summary>
109109
/// <remarks>
110-
/// The method behaves in the same way as <see cref="AbortAsync(IConnection)"/>, with the only
110+
/// The method behaves in the same way as <see cref="AbortAsync(IConnection, CancellationToken)"/>, with the only
111111
/// difference that the connection is closed with the given connection close code and message.
112112
/// <para>
113113
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
@@ -116,18 +116,18 @@ public static Task AbortAsync(this IConnection connection)
116116
/// A message indicating the reason for closing the connection
117117
/// </para>
118118
/// </remarks>
119-
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
119+
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, CancellationToken cancellationToken = default)
120120
{
121121
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
122-
CancellationToken.None);
122+
cancellationToken);
123123
}
124124

125125
/// <summary>
126126
/// Asynchronously abort this connection and all its channels and wait with a
127127
/// timeout for all the in-progress close operations to complete.
128128
/// </summary>
129129
/// <remarks>
130-
/// This method, behaves in a similar way as method <see cref="AbortAsync(IConnection)"/> with the
130+
/// This method, behaves in a similar way as method <see cref="AbortAsync(IConnection, CancellationToken)"/> with the
131131
/// only difference that it explicitly specifies a timeout given
132132
/// for all the in-progress close operations to complete.
133133
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
208208
public async Task CloseAsync(ShutdownEventArgs args, bool abort)
209209
{
210210
CancellationToken cancellationToken = args.CancellationToken;
211-
CancellationToken argCancellationToken = cancellationToken;
212211
if (IsOpen)
213212
{
214213
// Note: we really do need to try and close this channel!
@@ -265,7 +264,6 @@ await ConsumerDispatcher.WaitForShutdownAsync()
265264
MaybeDisposeContinuation(enqueued, k);
266265
_rpcSemaphore.Release();
267266
ChannelShutdownAsync -= k.OnConnectionShutdownAsync;
268-
argCancellationToken.ThrowIfCancellationRequested();
269267
}
270268
}
271269

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, b
320320
internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
321321
{
322322
CancellationToken cancellationToken = reason.CancellationToken;
323-
CancellationToken argCancellationToken = cancellationToken;
324323

325324
if (abort && timeout < InternalConstants.DefaultConnectionAbortTimeout)
326325
{
@@ -431,8 +430,6 @@ await _frameHandler.CloseAsync(cts.Token)
431430
throw;
432431
}
433432
}
434-
435-
argCancellationToken.ThrowIfCancellationRequested();
436433
}
437434

438435
internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason)

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
770770
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
771771
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
772772
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
773-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
773+
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
774774
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
775775
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
776776
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
@@ -784,7 +784,7 @@ static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Cli
784784
static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk!>!
785785
static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool ifUnused = false, bool ifEmpty = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>!
786786
static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel! channel, string! queue, string! exchange, string! routingKey, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
787-
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task!
787+
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
788788
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
789789
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
790790
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!

projects/Test/Integration/TestConnectionShutdown.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.IO;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client;
3637
using RabbitMQ.Client.Exceptions;
@@ -91,6 +92,33 @@ public async Task TestAbortWithSocketClosedOutOfBand()
9192
await WaitAsync(tcs, TimeSpan.FromSeconds(6), "channel shutdown");
9293
}
9394

95+
[Fact]
96+
public async Task TestAbortWithSocketClosedOutOfBandAndCancellation()
97+
{
98+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
99+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
100+
101+
_channel.ChannelShutdownAsync += async (channel, args) =>
102+
{
103+
try
104+
{
105+
await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken);
106+
}
107+
catch (OperationCanceledException)
108+
{
109+
tcs.SetResult(true);
110+
}
111+
};
112+
113+
var c = (AutorecoveringConnection)_conn;
114+
await c.CloseFrameHandlerAsync();
115+
116+
await _conn.AbortAsync(cts.Token);
117+
118+
// default Connection.Abort() timeout and then some
119+
await WaitAsync(tcs, TimeSpan.FromSeconds(6), "channel shutdown");
120+
}
121+
94122
[Fact]
95123
public async Task TestDisposedWithSocketClosedOutOfBand()
96124
{
@@ -134,6 +162,29 @@ public async Task TestShutdownSignalPropagationToChannels()
134162
await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown");
135163
}
136164

165+
[Fact]
166+
public async Task TestShutdownSignalPropagationWithCancellationToChannels()
167+
{
168+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
169+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
170+
171+
_channel.ChannelShutdownAsync += async (channel, args) =>
172+
{
173+
try
174+
{
175+
await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken);
176+
}
177+
catch (OperationCanceledException)
178+
{
179+
tcs.SetResult(true);
180+
}
181+
};
182+
183+
await _conn.CloseAsync(cts.Token);
184+
185+
await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown");
186+
}
187+
137188
[Fact]
138189
public async Task TestShutdownSignalPropagationToChannelsUsingDispose()
139190
{

0 commit comments

Comments
 (0)