Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand All @@ -28,11 +29,11 @@ public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool
return Task.CompletedTask;
}

public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask;
public Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask;
public Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask;
public Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public CountingConsumer(IChannel channel, uint messageCount) : base(channel)
}

/// <inheritdoc />
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
if (Interlocked.Decrement(ref _remainingCount) == 0)
{
Expand Down
79 changes: 42 additions & 37 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Large diffs are not rendered by default.

27 changes: 17 additions & 10 deletions projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -45,33 +46,36 @@ public string[] ConsumerTags
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IChannel Channel { get; private set; }
public IChannel Channel { get; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
/// e.g. the queue has been deleted (either by this channel or by any other channel).
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancelAsync(string consumerTag)
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default)
{
return OnCancel(consumerTag);
return OnCancelAsync(new[] { consumerTag }, cancellationToken);
}

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancelOkAsync(string consumerTag)
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default)
{
return OnCancel(consumerTag);
return OnCancelAsync(new[] { consumerTag }, cancellationToken);
}

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicConsumeOkAsync(string consumerTag)
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default)
{
_consumerTags.Add(consumerTag);
IsRunning = true;
Expand All @@ -94,7 +98,8 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
string exchange,
string routingKey,
IReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
// Nothing to do here.
return Task.CompletedTask;
Expand All @@ -108,18 +113,20 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason)
{
ShutdownReason = reason;
return OnCancel(_consumerTags.ToArray());
return OnCancelAsync(ConsumerTags, reason.CancellationToken);
}

/// <summary>
/// Default implementation - overridable in subclasses.</summary>
/// <param name="consumerTags">The set of consumer tags that where cancelled</param>
/// <param name="consumerTags">The set of consumer tags that were cancelled</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// This default implementation simply sets the <see cref="IsRunning"/> property to false, and takes no further action.
/// </remarks>
public virtual Task OnCancel(params string[] consumerTags)
protected virtual Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
{
IsRunning = false;

foreach (string consumerTag in consumerTags)
{
_consumerTags.Remove(consumerTag);
Expand Down
15 changes: 10 additions & 5 deletions projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Client
Expand All @@ -20,19 +21,22 @@ public interface IAsyncBasicConsumer
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancelAsync(string consumerTag);
/// <param name="cancellationToken">The cancellation token.</param>
Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default);

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancelOkAsync(string consumerTag);
/// <param name="cancellationToken">The cancellation token.</param>
Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default);

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicConsumeOkAsync(string consumerTag);
/// <param name="cancellationToken">The cancellation token.</param>
Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default);

/// <summary>
/// Called each time a message arrives for this consumer.
Expand All @@ -44,7 +48,7 @@ public interface IAsyncBasicConsumer
/// </para>
/// <para>
/// NOTE: Using the <c>body</c> outside of
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte})"/></c>
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte}, CancellationToken)"/></c>
/// requires that it be copied!
/// </para>
/// </remarks>
Expand All @@ -55,7 +59,8 @@ Task HandleBasicDeliverAsync(string consumerTag,
string exchange,
string routingKey,
IReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body);
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default);

/// <summary>
/// Called when the channel shuts down.
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event AsyncEventHandler<EventArgs> RecoverySucceededAsync;
event AsyncEventHandler<AsyncEventArgs> RecoverySucceededAsync;

/// <summary>
/// Raised when the connection recovery fails, e.g. because reconnection or topology
Expand Down Expand Up @@ -212,7 +212,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <summary>
/// Raised when a connection is unblocked by the AMQP broker.
/// </summary>
event AsyncEventHandler<EventArgs> ConnectionUnblockedAsync;
event AsyncEventHandler<AsyncEventArgs> ConnectionUnblockedAsync;

/// <summary>
/// This method updates the secret used to authenticate this connection.
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IRecoverable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ namespace RabbitMQ.Client
/// </summary>
public interface IRecoverable
{
event AsyncEventHandler<EventArgs> RecoveryAsync;
event AsyncEventHandler<AsyncEventArgs> RecoveryAsync;
}
}
16 changes: 10 additions & 6 deletions projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Threading;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
Expand All @@ -39,7 +41,8 @@ namespace RabbitMQ.Client
/// <remarks>
/// The <see cref="ClassId"/> and <see cref="Initiator"/> properties should be used to determine the originator of the shutdown event.
/// </remarks>
public class ShutdownEventArgs : EventArgs
/// TODO: Should this be moved to the events folder and the namespace be adjusted?
public class ShutdownEventArgs : AsyncEventArgs
{
private readonly Exception? _exception;

Expand All @@ -48,16 +51,17 @@ public class ShutdownEventArgs : EventArgs
/// 0 for <see cref="ClassId"/> and <see cref="MethodId"/>.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
object? cause = null)
: this(initiator, replyCode, replyText, 0, 0, cause)
object? cause = null, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cause, cancellationToken: cancellationToken)
{
}

/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
ushort classId, ushort methodId, object? cause = null)
ushort classId, ushort methodId, object? cause = null, CancellationToken cancellationToken = default)
: base(cancellationToken)
{
Initiator = initiator;
ReplyCode = replyCode;
Expand All @@ -70,8 +74,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception)
: this(initiator, replyCode, replyText, 0, 0)
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
{
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
}
Expand Down
86 changes: 86 additions & 0 deletions projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System.Threading;

namespace RabbitMQ.Client.Events
{
/// <summary>
/// Provides data for <see cref="AsyncEventHandler{T}"/>
/// events that can be invoked asynchronously.
/// </summary>
public class AsyncEventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="AsyncEventArgs"/>
/// class.
/// </summary>
/// <param name="cancellationToken">
/// A cancellation token related to the original operation that raised
/// the event. It's important for your handler to pass this token
/// along to any asynchronous or long-running synchronous operations
/// that take a token so cancellation will correctly propagate. The
/// default value is <see cref="CancellationToken.None"/>.
/// </param>
public AsyncEventArgs(CancellationToken cancellationToken = default)
: base()
{
CancellationToken = cancellationToken;
}

/// <summary>
/// Gets a cancellation token related to the original operation that
/// raised the event. It's important for your handler to pass this
/// token along to any asynchronous or long-running synchronous
/// operations that take a token so cancellation (via something like
/// <code>
/// new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
/// </code>
/// for example) will correctly propagate.
/// </summary>
public CancellationToken CancellationToken { get; }

public static AsyncEventArgs CreateOrDefault(CancellationToken cancellationToken)
{
if (cancellationToken.CanBeCanceled)
{
return new AsyncEventArgs(cancellationToken);
}

return Empty;
}

/// <summary>
/// Provides a value to use with events that do not have event data.
/// </summary>
public static readonly AsyncEventArgs Empty = new AsyncEventArgs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@

namespace RabbitMQ.Client.Events
{
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event);
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : AsyncEventArgs;
}
Loading