From 1ec8ff3c438b6b5b1fb8f89e22bb2c374847e924 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 11:20:40 +0200 Subject: [PATCH 1/2] Make session event async --- .../impl/AutorecoveringConnection.Recovery.cs | 2 +- .../client/impl/ChannelBase.cs | 25 ++++++---- .../client/impl/Connection.Receive.cs | 2 +- .../RabbitMQ.Client/client/impl/Connection.cs | 12 +++-- .../RabbitMQ.Client/client/impl/ISession.cs | 9 ++-- .../client/impl/MainSession.cs | 34 ++----------- .../client/impl/SessionBase.cs | 48 ++++++++++--------- .../client/impl/SessionManager.cs | 6 ++- .../Test/Integration/TestChannelShutdown.cs | 4 +- .../Integration/TestConnectionShutdown.cs | 4 +- 10 files changed, 65 insertions(+), 81 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 0ffd5773d0..2a896065ff 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -126,7 +126,7 @@ await Task.Delay(_config.NetworkRecoveryInterval, token) /// /// Async cancels the main recovery loop and will block until the loop finishes, or the timeout - /// expires, to prevent Close operations overlapping with recovery operations. + /// expires, to prevent CloseAsync operations overlapping with recovery operations. /// private async ValueTask StopRecoveryLoopAsync(CancellationToken cancellationToken) { diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index fa2daead4b..2875f364cd 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -90,7 +90,7 @@ protected ChannelBase(ConnectionConfig config, ISession session, _channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException); _recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException); session.CommandReceived = HandleCommandAsync; - session.SessionShutdown += OnSessionShutdown; + session.SessionShutdownAsync += OnSessionShutdownAsync; Session = session; } @@ -403,12 +403,13 @@ await ModelSendAsync(method, k.CancellationToken) } } - internal void FinishClose() + internal async Task FinishCloseAsync(CancellationToken cancellationToken) { ShutdownEventArgs? reason = CloseReason; if (reason != null) { - Session.Close(reason); + await Session.CloseAsync(reason, cancellationToken) + .ConfigureAwait(false); } m_connectionStartCell?.TrySetResult(null); @@ -470,7 +471,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) ///Broadcasts notification of the final shutdown of the channel. /// /// - ///Do not call anywhere other than at the end of OnSessionShutdown. + ///Do not call anywhere other than at the end of OnSessionShutdownAsync. /// /// ///Must not be called when m_closeReason is null, because @@ -517,12 +518,13 @@ private void OnChannelShutdown(ShutdownEventArgs reason) * * Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551 */ - private void OnSessionShutdown(object? sender, ShutdownEventArgs reason) + private Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason) { ConsumerDispatcher.Quiesce(); SetCloseReason(reason); OnChannelShutdown(reason); ConsumerDispatcher.Shutdown(reason); + return Task.CompletedTask; } [MemberNotNull(nameof(_closeReason))] @@ -533,7 +535,7 @@ internal bool SetCloseReason(ShutdownEventArgs reason) throw new ArgumentNullException(nameof(reason)); } - // NB: this ensures that Close is only called once on a channel + // NB: this ensures that CloseAsync is only called once on a channel return Interlocked.CompareExchange(ref _closeReason, reason, null) is null; } @@ -649,13 +651,15 @@ protected async Task HandleChannelCloseAsync(IncomingCommand cmd, Cancella channelClose._classId, channelClose._methodId)); - Session.Close(_closeReason, false); + await Session.CloseAsync(_closeReason, false, cancellationToken) + .ConfigureAwait(false); var method = new ChannelCloseOk(); await ModelSendAsync(method, cancellationToken) .ConfigureAwait(false); - Session.Notify(); + await Session.NotifyAsync(cancellationToken) + .ConfigureAwait(false); return true; } @@ -665,7 +669,8 @@ protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel * Note: * This call _must_ come before completing the async continuation */ - FinishClose(); + await FinishCloseAsync(cancellationToken) + .ConfigureAwait(false); if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) { @@ -715,7 +720,7 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); try { - await Session.Connection.ClosedViaPeerAsync(reason) + await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) .ConfigureAwait(false); var replyMethod = new ConnectionCloseOk(); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index f9ecbd918e..16c4ce5bb9 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -228,7 +228,7 @@ private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, if (SetCloseReason(hpe.ShutdownReason)) { await OnShutdownAsync(hpe.ShutdownReason).ConfigureAwait(false); - await _session0.SetSessionClosingAsync(false) + await _session0.SetSessionClosingAsync(false, mainLoopCancellationToken) .ConfigureAwait(false); try { diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 4c184cf255..77aaed4193 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -332,7 +332,7 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti await OnShutdownAsync(reason) .ConfigureAwait(false); - await _session0.SetSessionClosingAsync(false) + await _session0.SetSessionClosingAsync(false, cancellationToken) .ConfigureAwait(false); try @@ -411,7 +411,7 @@ await _frameHandler.CloseAsync(cancellationToken) } } - internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason) + internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason, CancellationToken cancellationToken) { if (false == SetCloseReason(reason)) { @@ -424,7 +424,7 @@ internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason) await OnShutdownAsync(reason) .ConfigureAwait(false); - await _session0.SetSessionClosingAsync(true) + await _session0.SetSessionClosingAsync(true, cancellationToken) .ConfigureAwait(false); MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); } @@ -436,9 +436,11 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken) _closed = true; MaybeStopHeartbeatTimers(); - await _frameHandler.CloseAsync(cancellationToken).ConfigureAwait(false); + await _frameHandler.CloseAsync(cancellationToken) + .ConfigureAwait(false); _channel0.SetCloseReason(CloseReason!); - _channel0.FinishClose(); + await _channel0.FinishCloseAsync(cancellationToken) + .ConfigureAwait(false); RabbitMqClientEventSource.Log.ConnectionClosed(); } diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 5dd574b25a..74f198edc5 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -32,6 +32,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Framing.Impl; namespace RabbitMQ.Client.Impl @@ -68,15 +69,15 @@ internal interface ISession /// /// Multicast session shutdown event. /// - event EventHandler SessionShutdown; + event AsyncEventHandler SessionShutdownAsync; - void Close(ShutdownEventArgs reason); + Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken); - void Close(ShutdownEventArgs reason, bool notify); + Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken); Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); - void Notify(); + Task NotifyAsync(CancellationToken cancellationToken); ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod; diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index edb82ec773..85aeb34fc5 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -82,38 +82,10 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc return base.HandleFrameAsync(frame, cancellationToken); } - /// Set channel 0 as quiescing - /// - /// Method should be idempotent. Cannot use base.Close - /// method call because that would prevent us from - /// sending/receiving Close/CloseOk commands - /// - public void SetSessionClosing(bool closeIsServerInitiated) + public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken) { - if (_closingSemaphore.Wait(InternalConstants.DefaultConnectionAbortTimeout)) - { - try - { - if (false == _closing) - { - _closing = true; - _closeIsServerInitiated = closeIsServerInitiated; - } - } - finally - { - _closingSemaphore.Release(); - } - } - else - { - throw new InvalidOperationException("couldn't enter semaphore"); - } - } - - public async Task SetSessionClosingAsync(bool closeIsServerInitiated) - { - if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout).ConfigureAwait(false)) + if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken) + .ConfigureAwait(false)) { try { diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 672b9d5277..2b5381bdef 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -36,6 +36,7 @@ using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.client.framing; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Logging; @@ -58,13 +59,13 @@ protected SessionBase(Connection connection, ushort channelNumber) RabbitMqClientEventSource.Log.ChannelOpened(); } - public event EventHandler SessionShutdown + public event AsyncEventHandler SessionShutdownAsync { add { if (CloseReason is null) { - _sessionShutdownWrapper.AddHandler(value); + _sessionShutdownAsyncWrapper.AddHandler(value); } else { @@ -73,10 +74,10 @@ public event EventHandler SessionShutdown } remove { - _sessionShutdownWrapper.RemoveHandler(value); + _sessionShutdownAsyncWrapper.RemoveHandler(value); } } - private EventingWrapper _sessionShutdownWrapper; + private AsyncEventingWrapper _sessionShutdownAsyncWrapper; public ushort ChannelNumber { get; } @@ -86,29 +87,17 @@ public event EventHandler SessionShutdown [MemberNotNullWhen(false, nameof(CloseReason))] public bool IsOpen => CloseReason is null; - public Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason) - { - Close(reason); - return Task.CompletedTask; - } - - public void OnSessionShutdown(ShutdownEventArgs reason) - { - Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync; - _sessionShutdownWrapper.Invoke(this, reason); - } - public override string ToString() { return $"{GetType().Name}#{ChannelNumber}:{Connection}"; } - public void Close(ShutdownEventArgs reason) + public Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken) { - Close(reason, true); + return CloseAsync(reason, true, cancellationToken); } - public void Close(ShutdownEventArgs reason, bool notify) + public Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken) { if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null) { @@ -117,23 +106,25 @@ public void Close(ShutdownEventArgs reason, bool notify) if (notify) { - OnSessionShutdown(CloseReason!); + return OnSessionShutdownAsync(CloseReason!); } + + return Task.CompletedTask; } public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); - public void Notify() + public Task NotifyAsync(CancellationToken cancellationToken) { // Ensure that we notify only when session is already closed // If not, throw exception, since this is a serious bug in the library ShutdownEventArgs? reason = CloseReason; if (reason is null) { - throw new InvalidOperationException("Internal Error in SessionBase.Notify"); + throw new InvalidOperationException("Internal Error in SessionBase.NotifyAsync"); } - OnSessionShutdown(reason); + return OnSessionShutdownAsync(reason); } public virtual ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod @@ -162,6 +153,17 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head return Connection.WriteAsync(bytes, cancellationToken); } + private Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason) + { + return CloseAsync(reason, CancellationToken.None); + } + + private Task OnSessionShutdownAsync(ShutdownEventArgs reason) + { + Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync; + return _sessionShutdownAsyncWrapper.InvokeAsync(this, reason); + } + private void ThrowAlreadyClosedException() => throw new AlreadyClosedException(CloseReason!); } diff --git a/projects/RabbitMQ.Client/client/impl/SessionManager.cs b/projects/RabbitMQ.Client/client/impl/SessionManager.cs index 87d2111862..3157919b07 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionManager.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionManager.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System.Collections.Generic; +using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Util; @@ -77,13 +78,13 @@ public ISession Create() ISession session = new Session(_connection, (ushort)channelNumber, _maxInboundMessageBodySize); - session.SessionShutdown += HandleSessionShutdown; + session.SessionShutdownAsync += HandleSessionShutdownAsync; _sessionMap[channelNumber] = session; return session; } } - private void HandleSessionShutdown(object? sender, ShutdownEventArgs reason) + private Task HandleSessionShutdownAsync(object? sender, ShutdownEventArgs reason) { lock (_sessionMap) { @@ -91,6 +92,7 @@ private void HandleSessionShutdown(object? sender, ShutdownEventArgs reason) _sessionMap.Remove(session.ChannelNumber); _ints.Free(session.ChannelNumber); } + return Task.CompletedTask; } public ISession Lookup(int number) diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index e51da67fc2..a08437af12 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -55,10 +55,10 @@ public async Task TestConsumerDispatcherShutdown() tcs.SetResult(true); }; - Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before Close"); + Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync"); await _channel.CloseAsync(); await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); - Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after Close"); + Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } } } diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 902421967b..6c2e67fe45 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -156,10 +156,10 @@ public async Task TestConsumerDispatcherShutdown() { tcs.SetResult(true); }; - Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before Close"); + Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync"); await _conn.CloseAsync(); await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown"); - Assert.True(m.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after Close"); + Assert.True(m.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } [Fact] From 26906dbd3e5712b0c9a0baa9e615b27514eb52fd Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 11:20:59 +0200 Subject: [PATCH 2/2] Rename close --- projects/RabbitMQ.Client/client/api/IChannelExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index a277ded4d5..82379f4eb1 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -238,7 +238,7 @@ public static Task CloseAsync(this IChannel channel, /// The reply text. /// The cancellation token. /// - /// The method behaves in the same way as Close(), with the only + /// The method behaves in the same way as CloseAsync(), with the only /// difference that the channel is closed with the given channel /// close code and message. ///