Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ protected override async Task ProcessChannelAsync()
{
await work.Consumer.HandleBasicDeliverAsync(
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory, work.CancellationToken)
.ConfigureAwait(false);
}
break;
case WorkType.Cancel:
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!)
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!, work.CancellationToken)
.ConfigureAwait(false);
break;
case WorkType.CancelOk:
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!)
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!, work.CancellationToken)
.ConfigureAwait(false);
break;
case WorkType.ConsumeOk:
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!)
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!, work.CancellationToken)
.ConfigureAwait(false);
break;
case WorkType.Shutdown:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
private readonly ushort _concurrency;
private long _isQuiescing;
private bool _disposed;
private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource();

internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
{
Expand Down Expand Up @@ -92,7 +93,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s
try
{
AddConsumer(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -113,7 +114,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
if (false == _disposed && false == IsQuiescing)
{
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -126,7 +127,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
if (false == _disposed && false == IsQuiescing)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -139,18 +140,31 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
if (false == _disposed && false == IsQuiescing)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

public void Quiesce()
{
if (IsQuiescing)
{
return;
}

Interlocked.Exchange(ref _isQuiescing, 1);
try
{
_shutdownCts.Cancel();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with this approach, we might want to make Quiesce async and implement IAsyncDisposable instead on the consumer dispatcher channel base instead of IDisposable.

}
catch
{
// ignore
}
}

public async Task WaitForShutdownAsync()
public async Task WaitForShutdownAsync(CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -169,7 +183,7 @@ public async Task WaitForShutdownAsync()
*
* await _reader.Completion.ConfigureAwait(false);
*/
await _worker
await _worker.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (AggregateException aex)
Expand Down Expand Up @@ -203,18 +217,13 @@ protected bool IsQuiescing
{
get
{
if (Interlocked.Read(ref _isQuiescing) == 1)
{
return true;
}

return false;
return Interlocked.Read(ref _isQuiescing) == 1;
}
}

protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
{
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason, _shutdownCts));
}

protected override Task InternalShutdownAsync()
Expand All @@ -237,25 +246,32 @@ protected override Task InternalShutdownAsync()
public readonly RentedMemory Body;
public readonly ShutdownEventArgs? Reason;
public readonly WorkType WorkType;
public readonly CancellationToken CancellationToken;
private readonly CancellationTokenSource? _cancellationTokenSource;

private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag)
private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
: this()
{
WorkType = type;
Consumer = consumer;
ConsumerTag = consumerTag;
CancellationToken = cancellationToken;
_cancellationTokenSource = null;
}

private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource? cancellationTokenSource)
: this()
{
WorkType = WorkType.Shutdown;
Consumer = consumer;
Reason = reason;
CancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;
this._cancellationTokenSource = cancellationTokenSource;
}

private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
CancellationToken cancellationToken)
{
WorkType = WorkType.Deliver;
Consumer = consumer;
Expand All @@ -266,37 +282,62 @@ private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliv
RoutingKey = routingKey;
BasicProperties = basicProperties;
Body = body;
Reason = default;
Reason = null;
CancellationToken = cancellationToken;
_cancellationTokenSource = null;
}

public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag)
public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
return new WorkStruct(WorkType.Cancel, consumer, consumerTag, cancellationTokenSource.Token);
}

public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag)
public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag, cancellationTokenSource.Token);
}

public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag)
public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag, cancellationTokenSource.Token);
}

public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(consumer, reason);
// Create a linked CTS so the shutdown args token reflects both dispatcher cancellation and any upstream token.
CancellationTokenSource? linked = null;
try
{
if (reason.CancellationToken.CanBeCanceled)
{
linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, reason.CancellationToken);
}
}
catch
{
linked = null;
}

CancellationToken token = linked?.Token ?? cancellationTokenSource.Token;
ShutdownEventArgs argsWithToken = reason.Exception != null ?
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.Exception, token) :
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId, reason.Cause, token);

return new WorkStruct(consumer, argsWithToken, linked);
}

public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
exchange, routingKey, basicProperties, body);
exchange, routingKey, basicProperties, body, cancellationTokenSource.Token);
}

public void Dispose() => Body.Dispose();
public void Dispose()
{
Body.Dispose();
_cancellationTokenSource?.Dispose();
}
}

protected enum WorkType : byte
Expand All @@ -317,6 +358,7 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
Quiesce();
_shutdownCts.Dispose();
}
}
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag,
void Quiesce();

Task ShutdownAsync(ShutdownEventArgs reason);
Task WaitForShutdownAsync();
Task WaitForShutdownAsync(CancellationToken cancellationToken);
}
}
4 changes: 1 addition & 3 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,8 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
/// </summary>
/// <param name="reason">The <see cref="ShutdownEventArgs"/> instance containing the close data.</param>
/// <param name="abort">Whether or not the close is an abort (ignoring certain exceptions).</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <returns></returns>
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);
Task CloseAsync(ShutdownEventArgs reason, bool abort);

/// <summary>Asynchronously declare an exchange.</summary>
/// <param name="exchange">The name of the exchange.</param>
Expand Down
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,21 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
/// </summary>
/// <remarks>
/// Note that all active channels and sessions will be closed if this method is called.
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection)"/> will not throw
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection, CancellationToken)"/> will not throw
/// <see cref="IOException"/> during closing connection.
///This method waits infinitely for the in-progress close operation to complete.
/// </remarks>
public static Task AbortAsync(this IConnection connection)
public static Task AbortAsync(this IConnection connection, CancellationToken cancellationToken = default)
{
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
CancellationToken.None);
cancellationToken);
}

/// <summary>
/// Asynchronously abort this connection and all its channels.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="AbortAsync(IConnection)"/>, with the only
/// The method behaves in the same way as <see cref="AbortAsync(IConnection, CancellationToken)"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
Expand All @@ -116,18 +116,18 @@ public static Task AbortAsync(this IConnection connection)
/// A message indicating the reason for closing the connection
/// </para>
/// </remarks>
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, CancellationToken cancellationToken = default)
{
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
CancellationToken.None);
cancellationToken);
}

/// <summary>
/// Asynchronously abort this connection and all its channels and wait with a
/// timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// This method, behaves in a similar way as method <see cref="AbortAsync(IConnection)"/> with the
/// This method, behaves in a similar way as method <see cref="AbortAsync(IConnection, CancellationToken)"/> with the
/// only difference that it explicitly specifies a timeout given
/// for all the in-progress close operations to complete.
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.
Expand Down
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,12 @@ await _connection.DeleteRecordedChannelAsync(this,
}
}

public async Task CloseAsync(ShutdownEventArgs args, bool abort,
CancellationToken cancellationToken)
public async Task CloseAsync(ShutdownEventArgs args, bool abort)
{
ThrowIfDisposed();
try
{
await _innerChannel.CloseAsync(args, abort, cancellationToken)
await _innerChannel.CloseAsync(args, abort)
.ConfigureAwait(false);
}
finally
Expand Down
Loading
Loading