diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
index a277ded4d..82379f4eb 100644
--- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
+++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
@@ -238,7 +238,7 @@ public static Task CloseAsync(this IChannel channel,
/// The reply text.
/// The cancellation token.
///
- /// The method behaves in the same way as Close(), with the only
+ /// The method behaves in the same way as CloseAsync(), with the only
/// difference that the channel is closed with the given channel
/// close code and message.
///
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
index 0ffd5773d..2a896065f 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
@@ -126,7 +126,7 @@ await Task.Delay(_config.NetworkRecoveryInterval, token)
///
/// Async cancels the main recovery loop and will block until the loop finishes, or the timeout
- /// expires, to prevent Close operations overlapping with recovery operations.
+ /// expires, to prevent CloseAsync operations overlapping with recovery operations.
///
private async ValueTask StopRecoveryLoopAsync(CancellationToken cancellationToken)
{
diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
index fa2daead4..2875f364c 100644
--- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
@@ -90,7 +90,7 @@ protected ChannelBase(ConnectionConfig config, ISession session,
_channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException);
_recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException);
session.CommandReceived = HandleCommandAsync;
- session.SessionShutdown += OnSessionShutdown;
+ session.SessionShutdownAsync += OnSessionShutdownAsync;
Session = session;
}
@@ -403,12 +403,13 @@ await ModelSendAsync(method, k.CancellationToken)
}
}
- internal void FinishClose()
+ internal async Task FinishCloseAsync(CancellationToken cancellationToken)
{
ShutdownEventArgs? reason = CloseReason;
if (reason != null)
{
- Session.Close(reason);
+ await Session.CloseAsync(reason, cancellationToken)
+ .ConfigureAwait(false);
}
m_connectionStartCell?.TrySetResult(null);
@@ -470,7 +471,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args)
///Broadcasts notification of the final shutdown of the channel.
///
///
- ///Do not call anywhere other than at the end of OnSessionShutdown.
+ ///Do not call anywhere other than at the end of OnSessionShutdownAsync.
///
///
///Must not be called when m_closeReason is null, because
@@ -517,12 +518,13 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
*
* Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551
*/
- private void OnSessionShutdown(object? sender, ShutdownEventArgs reason)
+ private Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason)
{
ConsumerDispatcher.Quiesce();
SetCloseReason(reason);
OnChannelShutdown(reason);
ConsumerDispatcher.Shutdown(reason);
+ return Task.CompletedTask;
}
[MemberNotNull(nameof(_closeReason))]
@@ -533,7 +535,7 @@ internal bool SetCloseReason(ShutdownEventArgs reason)
throw new ArgumentNullException(nameof(reason));
}
- // NB: this ensures that Close is only called once on a channel
+ // NB: this ensures that CloseAsync is only called once on a channel
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
}
@@ -649,13 +651,15 @@ protected async Task HandleChannelCloseAsync(IncomingCommand cmd, Cancella
channelClose._classId,
channelClose._methodId));
- Session.Close(_closeReason, false);
+ await Session.CloseAsync(_closeReason, false, cancellationToken)
+ .ConfigureAwait(false);
var method = new ChannelCloseOk();
await ModelSendAsync(method, cancellationToken)
.ConfigureAwait(false);
- Session.Notify();
+ await Session.NotifyAsync(cancellationToken)
+ .ConfigureAwait(false);
return true;
}
@@ -665,7 +669,8 @@ protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
* Note:
* This call _must_ come before completing the async continuation
*/
- FinishClose();
+ await FinishCloseAsync(cancellationToken)
+ .ConfigureAwait(false);
if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k))
{
@@ -715,7 +720,7 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId);
try
{
- await Session.Connection.ClosedViaPeerAsync(reason)
+ await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken)
.ConfigureAwait(false);
var replyMethod = new ConnectionCloseOk();
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
index f9ecbd918..16c4ce5bb 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
@@ -228,7 +228,7 @@ private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe,
if (SetCloseReason(hpe.ShutdownReason))
{
await OnShutdownAsync(hpe.ShutdownReason).ConfigureAwait(false);
- await _session0.SetSessionClosingAsync(false)
+ await _session0.SetSessionClosingAsync(false, mainLoopCancellationToken)
.ConfigureAwait(false);
try
{
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs
index 4c184cf25..77aaed419 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.cs
@@ -332,7 +332,7 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti
await OnShutdownAsync(reason)
.ConfigureAwait(false);
- await _session0.SetSessionClosingAsync(false)
+ await _session0.SetSessionClosingAsync(false, cancellationToken)
.ConfigureAwait(false);
try
@@ -411,7 +411,7 @@ await _frameHandler.CloseAsync(cancellationToken)
}
}
- internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason)
+ internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason, CancellationToken cancellationToken)
{
if (false == SetCloseReason(reason))
{
@@ -424,7 +424,7 @@ internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason)
await OnShutdownAsync(reason)
.ConfigureAwait(false);
- await _session0.SetSessionClosingAsync(true)
+ await _session0.SetSessionClosingAsync(true, cancellationToken)
.ConfigureAwait(false);
MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true);
}
@@ -436,9 +436,11 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken)
_closed = true;
MaybeStopHeartbeatTimers();
- await _frameHandler.CloseAsync(cancellationToken).ConfigureAwait(false);
+ await _frameHandler.CloseAsync(cancellationToken)
+ .ConfigureAwait(false);
_channel0.SetCloseReason(CloseReason!);
- _channel0.FinishClose();
+ await _channel0.FinishCloseAsync(cancellationToken)
+ .ConfigureAwait(false);
RabbitMqClientEventSource.Log.ConnectionClosed();
}
diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs
index 5dd574b25..74f198edc 100644
--- a/projects/RabbitMQ.Client/client/impl/ISession.cs
+++ b/projects/RabbitMQ.Client/client/impl/ISession.cs
@@ -32,6 +32,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
using RabbitMQ.Client.Framing.Impl;
namespace RabbitMQ.Client.Impl
@@ -68,15 +69,15 @@ internal interface ISession
///
/// Multicast session shutdown event.
///
- event EventHandler SessionShutdown;
+ event AsyncEventHandler SessionShutdownAsync;
- void Close(ShutdownEventArgs reason);
+ Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken);
- void Close(ShutdownEventArgs reason, bool notify);
+ Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken);
Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken);
- void Notify();
+ Task NotifyAsync(CancellationToken cancellationToken);
ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod;
diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs
index edb82ec77..85aeb34fc 100644
--- a/projects/RabbitMQ.Client/client/impl/MainSession.cs
+++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs
@@ -82,38 +82,10 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc
return base.HandleFrameAsync(frame, cancellationToken);
}
- /// Set channel 0 as quiescing
- ///
- /// Method should be idempotent. Cannot use base.Close
- /// method call because that would prevent us from
- /// sending/receiving Close/CloseOk commands
- ///
- public void SetSessionClosing(bool closeIsServerInitiated)
+ public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken)
{
- if (_closingSemaphore.Wait(InternalConstants.DefaultConnectionAbortTimeout))
- {
- try
- {
- if (false == _closing)
- {
- _closing = true;
- _closeIsServerInitiated = closeIsServerInitiated;
- }
- }
- finally
- {
- _closingSemaphore.Release();
- }
- }
- else
- {
- throw new InvalidOperationException("couldn't enter semaphore");
- }
- }
-
- public async Task SetSessionClosingAsync(bool closeIsServerInitiated)
- {
- if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout).ConfigureAwait(false))
+ if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken)
+ .ConfigureAwait(false))
{
try
{
diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs
index 672b9d527..2b5381bde 100644
--- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs
@@ -36,6 +36,7 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.framing;
+using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Logging;
@@ -58,13 +59,13 @@ protected SessionBase(Connection connection, ushort channelNumber)
RabbitMqClientEventSource.Log.ChannelOpened();
}
- public event EventHandler SessionShutdown
+ public event AsyncEventHandler SessionShutdownAsync
{
add
{
if (CloseReason is null)
{
- _sessionShutdownWrapper.AddHandler(value);
+ _sessionShutdownAsyncWrapper.AddHandler(value);
}
else
{
@@ -73,10 +74,10 @@ public event EventHandler SessionShutdown
}
remove
{
- _sessionShutdownWrapper.RemoveHandler(value);
+ _sessionShutdownAsyncWrapper.RemoveHandler(value);
}
}
- private EventingWrapper _sessionShutdownWrapper;
+ private AsyncEventingWrapper _sessionShutdownAsyncWrapper;
public ushort ChannelNumber { get; }
@@ -86,29 +87,17 @@ public event EventHandler SessionShutdown
[MemberNotNullWhen(false, nameof(CloseReason))]
public bool IsOpen => CloseReason is null;
- public Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason)
- {
- Close(reason);
- return Task.CompletedTask;
- }
-
- public void OnSessionShutdown(ShutdownEventArgs reason)
- {
- Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync;
- _sessionShutdownWrapper.Invoke(this, reason);
- }
-
public override string ToString()
{
return $"{GetType().Name}#{ChannelNumber}:{Connection}";
}
- public void Close(ShutdownEventArgs reason)
+ public Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken)
{
- Close(reason, true);
+ return CloseAsync(reason, true, cancellationToken);
}
- public void Close(ShutdownEventArgs reason, bool notify)
+ public Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null)
{
@@ -117,23 +106,25 @@ public void Close(ShutdownEventArgs reason, bool notify)
if (notify)
{
- OnSessionShutdown(CloseReason!);
+ return OnSessionShutdownAsync(CloseReason!);
}
+
+ return Task.CompletedTask;
}
public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken);
- public void Notify()
+ public Task NotifyAsync(CancellationToken cancellationToken)
{
// Ensure that we notify only when session is already closed
// If not, throw exception, since this is a serious bug in the library
ShutdownEventArgs? reason = CloseReason;
if (reason is null)
{
- throw new InvalidOperationException("Internal Error in SessionBase.Notify");
+ throw new InvalidOperationException("Internal Error in SessionBase.NotifyAsync");
}
- OnSessionShutdown(reason);
+ return OnSessionShutdownAsync(reason);
}
public virtual ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod
@@ -162,6 +153,17 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head
return Connection.WriteAsync(bytes, cancellationToken);
}
+ private Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason)
+ {
+ return CloseAsync(reason, CancellationToken.None);
+ }
+
+ private Task OnSessionShutdownAsync(ShutdownEventArgs reason)
+ {
+ Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync;
+ return _sessionShutdownAsyncWrapper.InvokeAsync(this, reason);
+ }
+
private void ThrowAlreadyClosedException()
=> throw new AlreadyClosedException(CloseReason!);
}
diff --git a/projects/RabbitMQ.Client/client/impl/SessionManager.cs b/projects/RabbitMQ.Client/client/impl/SessionManager.cs
index 87d211186..3157919b0 100644
--- a/projects/RabbitMQ.Client/client/impl/SessionManager.cs
+++ b/projects/RabbitMQ.Client/client/impl/SessionManager.cs
@@ -30,6 +30,7 @@
//---------------------------------------------------------------------------
using System.Collections.Generic;
+using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;
@@ -77,13 +78,13 @@ public ISession Create()
ISession session = new Session(_connection,
(ushort)channelNumber, _maxInboundMessageBodySize);
- session.SessionShutdown += HandleSessionShutdown;
+ session.SessionShutdownAsync += HandleSessionShutdownAsync;
_sessionMap[channelNumber] = session;
return session;
}
}
- private void HandleSessionShutdown(object? sender, ShutdownEventArgs reason)
+ private Task HandleSessionShutdownAsync(object? sender, ShutdownEventArgs reason)
{
lock (_sessionMap)
{
@@ -91,6 +92,7 @@ private void HandleSessionShutdown(object? sender, ShutdownEventArgs reason)
_sessionMap.Remove(session.ChannelNumber);
_ints.Free(session.ChannelNumber);
}
+ return Task.CompletedTask;
}
public ISession Lookup(int number)
diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs
index e51da67fc..a08437af1 100644
--- a/projects/Test/Integration/TestChannelShutdown.cs
+++ b/projects/Test/Integration/TestChannelShutdown.cs
@@ -55,10 +55,10 @@ public async Task TestConsumerDispatcherShutdown()
tcs.SetResult(true);
};
- Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before Close");
+ Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync");
await _channel.CloseAsync();
await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown");
- Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after Close");
+ Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync");
}
}
}
diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs
index 902421967..6c2e67fe4 100644
--- a/projects/Test/Integration/TestConnectionShutdown.cs
+++ b/projects/Test/Integration/TestConnectionShutdown.cs
@@ -156,10 +156,10 @@ public async Task TestConsumerDispatcherShutdown()
{
tcs.SetResult(true);
};
- Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before Close");
+ Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync");
await _conn.CloseAsync();
await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown");
- Assert.True(m.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after Close");
+ Assert.True(m.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync");
}
[Fact]