Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
830b1e4
Make connection events async
danielmarbach Sep 17, 2024
7337fad
Rename three internal async methods to use `Async` suffix.
lukebakken Sep 17, 2024
8c7f8f9
Rename `CallbackException` event to `CallbackExceptionAsync`
lukebakken Sep 17, 2024
ebb8e88
Rename `ConnectionBlocked` to `ConnectionBlockedAsync`
lukebakken Sep 17, 2024
5082dc6
Rename `ConnectionShutdown` to `ConnectionShutdownAsync`
lukebakken Sep 17, 2024
209fc7d
Rename `RecoverySucceeded` to `RecoverySucceededAsync`
lukebakken Sep 17, 2024
2d8bc9a
Rename `ConnectionRecoveryError` to `ConnectionRecoveryErrorAsync`
lukebakken Sep 17, 2024
a48789a
Rename `ConsumerTagChangeAfterRecovery` to `ConsumerTagChangeAfterRec…
lukebakken Sep 17, 2024
a4ebfc2
Rename `QueueNameChangedAfterRecovery` to `QueueNameChangedAfterRecov…
lukebakken Sep 17, 2024
4af30b0
Rename `RecoveringConsumer` to `RecoveringConsumerAsync`
lukebakken Sep 17, 2024
9015943
Ensure that `_recoveringConsumerAsyncWrapper` is initialized as expec…
lukebakken Sep 17, 2024
31e9ccc
Rename `ConnectionUnblocked` to `ConnectionUnblockedAsync`
lukebakken Sep 17, 2024
b82ae27
Rename `HandleMainLoopException` to `HandleMainLoopExceptionAsync`
lukebakken Sep 17, 2024
275134e
Rename `HandleConnectionShutdown` to `HandleConnectionShutdownAsync`
lukebakken Sep 17, 2024
213f0da
Rename `ClosedViaPeer` to `ClosedViaPeerAsync`
lukebakken Sep 17, 2024
5d97fa1
Rename `onException` to `onExceptionAsync`
lukebakken Sep 17, 2024
1deeae3
Add `_consumerAboutToBeRecoveredAsyncWrapper` to `TakeOver`
lukebakken Sep 17, 2024
5b6956b
Remove unused `virtual` keyword.
lukebakken Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -445,24 +445,24 @@ RabbitMQ.Client.IChannel.IsClosed.get -> bool
RabbitMQ.Client.IChannel.IsOpen.get -> bool
RabbitMQ.Client.IChannelExtensions
RabbitMQ.Client.IConnection
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IConnection.CallbackException -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IConnection.ChannelMax.get -> ushort
RabbitMQ.Client.IConnection.ClientProperties.get -> System.Collections.Generic.IDictionary<string, object>
RabbitMQ.Client.IConnection.ClientProvidedName.get -> string
RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.ConnectionBlocked -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
RabbitMQ.Client.IConnection.ConnectionShutdown -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IConnection.ConnectionUnblocked -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
RabbitMQ.Client.IConnection.FrameMax.get -> uint
RabbitMQ.Client.IConnection.Heartbeat.get -> System.TimeSpan
RabbitMQ.Client.IConnection.IsOpen.get -> bool
RabbitMQ.Client.IConnection.Protocol.get -> RabbitMQ.Client.IProtocol
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.RecoveringConsumer -> System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs>
RabbitMQ.Client.IConnection.RecoverySucceeded -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.RecoveringConsumer -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs>
RabbitMQ.Client.IConnection.RecoverySucceeded -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ServerProperties.get -> System.Collections.Generic.IDictionary<string, object>
RabbitMQ.Client.IConnection.ShutdownReport.get -> System.Collections.Generic.IEnumerable<RabbitMQ.Client.ShutdownReportEntry>
RabbitMQ.Client.IConnectionExtensions
Expand Down
18 changes: 9 additions & 9 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ public interface IConnection : INetworkConnection, IDisposable
/// <see cref="IConnection"/>, then this event will be signalled whenever one
/// of those event handlers throws an exception, as well.
/// </remarks>
event EventHandler<CallbackExceptionEventArgs> CallbackException;
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackException;

event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked;
event AsyncEventHandler<ConnectionBlockedEventArgs> ConnectionBlocked;

/// <summary>
/// Raised when the connection is destroyed.
Expand All @@ -155,15 +155,15 @@ public interface IConnection : INetworkConnection, IDisposable
/// event handler is added to this event, the event handler
/// will be fired immediately.
/// </remarks>
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdown;

/// <summary>
/// Raised when the connection completes recovery.
/// </summary>
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<EventArgs> RecoverySucceeded;
event AsyncEventHandler<EventArgs> RecoverySucceeded;

/// <summary>
/// Raised when the connection recovery fails, e.g. because reconnection or topology
Expand All @@ -172,7 +172,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
event AsyncEventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;

/// <summary>
/// Raised when the server-generated tag of a consumer registered on this connection changes during
Expand All @@ -182,7 +182,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
event AsyncEventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;

/// <summary>
/// Raised when the name of a server-named queue declared on this connection changes during
Expand All @@ -192,7 +192,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery;
event AsyncEventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery;

/// <summary>
/// Raised when a consumer is about to be recovered. This event raises when topology recovery
Expand All @@ -204,9 +204,9 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;

event EventHandler<EventArgs> ConnectionUnblocked;
event AsyncEventHandler<EventArgs> ConnectionUnblocked;

/// <summary>
/// This method updates the secret used to authenticate this connection.
Expand Down
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionBlocked:
{
HandleConnectionBlocked(cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionBlocked(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionClose:
{
Expand All @@ -128,7 +128,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
case ProtocolCommandId.ConnectionSecure:
{
// Note: always returns true
return HandleConnectionSecureAsync(cmd);
return HandleConnectionSecureAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionStart:
{
Expand All @@ -138,12 +138,12 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
case ProtocolCommandId.ConnectionTune:
{
// Note: always returns true
return HandleConnectionTuneAsync(cmd);
return HandleConnectionTuneAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionUnblocked:
{
HandleConnectionUnblocked();
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionUnblocked(cancellationToken);
}
default:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
private Task? _recoveryTask;
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();

private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
private Task HandleConnectionShutdown(object? _, ShutdownEventArgs args)
{
if (ShouldTriggerConnectionRecovery(args))
{
Expand All @@ -57,6 +57,8 @@ private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
}
}

return Task.CompletedTask;

static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
{
if (args.Initiator == ShutdownInitiator.Peer)
Expand Down Expand Up @@ -204,7 +206,8 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c

ESLog.Info("Connection recovery completed");
ThrowIfDisposed();
_recoverySucceededWrapper.Invoke(this, EventArgs.Empty);
await _recoverySucceededWrapper.InvokeAsync(this, EventArgs.Empty)
.ConfigureAwait(false);

return true;
}
Expand Down Expand Up @@ -269,7 +272,8 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
if (!_connectionRecoveryErrorWrapper.IsEmpty)
{
// Note: recordedEntities semaphore is _NOT_ held at this point
_connectionRecoveryErrorWrapper.Invoke(this, new ConnectionRecoveryErrorEventArgs(e));
await _connectionRecoveryErrorWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e))
.ConfigureAwait(false);
}

maybeNewInnerConnection?.Dispose();
Expand Down Expand Up @@ -382,7 +386,8 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
try
{
_recordedEntitiesSemaphore.Release();
_queueNameChangedAfterRecoveryWrapper.Invoke(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName));
await _queueNameChangedAfterRecoveryWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName))
.ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -515,7 +520,8 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe
try
{
_recordedEntitiesSemaphore.Release();
_consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments));
await _consumerAboutToBeRecoveredWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments))
.ConfigureAwait(false);
}
finally
{
Expand All @@ -536,7 +542,8 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
try
{
_recordedEntitiesSemaphore.Release();
_consumerTagChangeAfterRecoveryWrapper.Invoke(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag));
await _consumerTagChangeAfterRecoveryWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag))
.ConfigureAwait(false);
}
finally
{
Expand Down
44 changes: 22 additions & 22 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end
_innerConnection = innerConnection;

ConnectionShutdown += HandleConnectionShutdown;
_recoverySucceededWrapper = new EventingWrapper<EventArgs>("OnConnectionRecovery", onException);
_connectionRecoveryErrorWrapper = new EventingWrapper<ConnectionRecoveryErrorEventArgs>("OnConnectionRecoveryError", onException);
_consumerTagChangeAfterRecoveryWrapper = new EventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs>("OnConsumerRecovery", onException);
_queueNameChangedAfterRecoveryWrapper = new EventingWrapper<QueueNameChangedAfterRecoveryEventArgs>("OnQueueRecovery", onException);
_recoverySucceededWrapper = new AsyncEventingWrapper<EventArgs>("OnConnectionRecovery", onExceptionAsync);
_connectionRecoveryErrorWrapper = new AsyncEventingWrapper<ConnectionRecoveryErrorEventArgs>("OnConnectionRecoveryError", onExceptionAsync);
_consumerTagChangeAfterRecoveryWrapper = new AsyncEventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs>("OnConsumerRecovery", onExceptionAsync);
_queueNameChangedAfterRecoveryWrapper = new AsyncEventingWrapper<QueueNameChangedAfterRecoveryEventArgs>("OnQueueRecovery", onExceptionAsync);

void onException(Exception exception, string context) =>
_innerConnection.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
Task onExceptionAsync(Exception exception, string context) =>
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
}

internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
Expand All @@ -88,64 +88,64 @@ await innerConnection.OpenAsync(cancellationToken)
return connection;
}

public event EventHandler<EventArgs> RecoverySucceeded
public event AsyncEventHandler<EventArgs> RecoverySucceeded
{
add => _recoverySucceededWrapper.AddHandler(value);
remove => _recoverySucceededWrapper.RemoveHandler(value);
}
private EventingWrapper<EventArgs> _recoverySucceededWrapper;
private AsyncEventingWrapper<EventArgs> _recoverySucceededWrapper;

public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
public event AsyncEventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
{
add => _connectionRecoveryErrorWrapper.AddHandler(value);
remove => _connectionRecoveryErrorWrapper.RemoveHandler(value);
}
private EventingWrapper<ConnectionRecoveryErrorEventArgs> _connectionRecoveryErrorWrapper;
private AsyncEventingWrapper<ConnectionRecoveryErrorEventArgs> _connectionRecoveryErrorWrapper;

public event EventHandler<CallbackExceptionEventArgs> CallbackException
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackException
{
add => InnerConnection.CallbackException += value;
remove => InnerConnection.CallbackException -= value;
}

public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
public event AsyncEventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
{
add => InnerConnection.ConnectionBlocked += value;
remove => InnerConnection.ConnectionBlocked -= value;
}

public event EventHandler<ShutdownEventArgs> ConnectionShutdown
public event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdown
{
add => InnerConnection.ConnectionShutdown += value;
remove => InnerConnection.ConnectionShutdown -= value;
}

public event EventHandler<EventArgs> ConnectionUnblocked
public event AsyncEventHandler<EventArgs> ConnectionUnblocked
{
add => InnerConnection.ConnectionUnblocked += value;
remove => InnerConnection.ConnectionUnblocked -= value;
}

public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
public event AsyncEventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
{
add => _consumerTagChangeAfterRecoveryWrapper.AddHandler(value);
remove => _consumerTagChangeAfterRecoveryWrapper.RemoveHandler(value);
}
private EventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs> _consumerTagChangeAfterRecoveryWrapper;
private AsyncEventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs> _consumerTagChangeAfterRecoveryWrapper;

public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery
public event AsyncEventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery
{
add => _queueNameChangedAfterRecoveryWrapper.AddHandler(value);
remove => _queueNameChangedAfterRecoveryWrapper.RemoveHandler(value);
}
private EventingWrapper<QueueNameChangedAfterRecoveryEventArgs> _queueNameChangedAfterRecoveryWrapper;
private AsyncEventingWrapper<QueueNameChangedAfterRecoveryEventArgs> _queueNameChangedAfterRecoveryWrapper;

public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
{
add => _consumerAboutToBeRecovered.AddHandler(value);
remove => _consumerAboutToBeRecovered.RemoveHandler(value);
add => _consumerAboutToBeRecoveredWrapper.AddHandler(value);
remove => _consumerAboutToBeRecoveredWrapper.RemoveHandler(value);
}
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;
private AsyncEventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecoveredWrapper;

public string? ClientProvidedName => _config.ClientProvidedName;

Expand Down
Loading