diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 247543a90..2bf1ccc9a 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -320,6 +320,9 @@ public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, b /// internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout, CancellationToken cancellationToken) { + using var timeoutCts = new CancellationTokenSource(timeout); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, cancellationToken); + if (false == SetCloseReason(reason)) { // close reason is already set @@ -330,11 +333,9 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti } else { - cancellationToken.ThrowIfCancellationRequested(); - await OnShutdownAsync(reason) .ConfigureAwait(false); - await _session0.SetSessionClosingAsync(false, cancellationToken) + await _session0.SetSessionClosingAsync(false, cts.Token) .ConfigureAwait(false); try @@ -343,7 +344,7 @@ await _session0.SetSessionClosingAsync(false, cancellationToken) if (false == _closed) { var method = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0); - await _session0.TransmitAsync(method, cancellationToken) + await _session0.TransmitAsync(method, cts.Token) .ConfigureAwait(false); } } @@ -392,14 +393,14 @@ await _session0.TransmitAsync(method, cancellationToken) try { - await _mainLoopTask.WaitAsync(timeout, cancellationToken) + await _mainLoopTask.WaitAsync(timeout, cts.Token) .ConfigureAwait(false); } catch { try { - await _frameHandler.CloseAsync(cancellationToken) + await _frameHandler.CloseAsync(cts.Token) .ConfigureAwait(false); } catch @@ -518,7 +519,6 @@ await this.AbortAsync() } _session0.Dispose(); - _mainLoopCts.Dispose(); await _channel0.DisposeAsync() .ConfigureAwait(false); @@ -529,6 +529,7 @@ await _channel0.DisposeAsync() } finally { + _mainLoopCts.Dispose(); _disposed = true; } } diff --git a/projects/RabbitMQ.Client/Impl/MainSession.cs b/projects/RabbitMQ.Client/Impl/MainSession.cs index 80abd9899..935d4a65d 100644 --- a/projects/RabbitMQ.Client/Impl/MainSession.cs +++ b/projects/RabbitMQ.Client/Impl/MainSession.cs @@ -47,6 +47,7 @@ internal sealed class MainSession : Session, IDisposable private volatile bool _closeIsServerInitiated; private volatile bool _closing; private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1); + private bool _disposed = false; public MainSession(Connection connection, uint maxBodyLength) : base(connection, 0, maxBodyLength) @@ -83,6 +84,13 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken) { + if (_disposed) + { + _closing = true; + _closeIsServerInitiated = closeIsServerInitiated; + return; + } + if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken) .ConfigureAwait(false)) { @@ -122,6 +130,24 @@ public override ValueTask TransmitAsync(in T cmd, CancellationToken cancellat return base.TransmitAsync(in cmd, cancellationToken); } - public void Dispose() => ((IDisposable)_closingSemaphore).Dispose(); + public void Dispose() + { + if (_disposed) + { + return; + } + + try + { + _closingSemaphore.Dispose(); + } + catch + { + } + finally + { + _disposed = true; + } + } } } diff --git a/projects/Test/Integration/GH/TestGitHubIssues.cs b/projects/Test/Integration/GH/TestGitHubIssues.cs index 7f8810fb6..f288c2822 100644 --- a/projects/Test/Integration/GH/TestGitHubIssues.cs +++ b/projects/Test/Integration/GH/TestGitHubIssues.cs @@ -131,5 +131,21 @@ public async Task TestHeartbeatTimeoutValue_GH1756() Assert.True(_conn.Heartbeat != default); } + + [Fact] + public async Task DisposeWhileCatchingTimeoutDeadlocksRepro_GH1759() + { + _connFactory = new ConnectionFactory(); + _conn = await _connFactory.CreateConnectionAsync(); + try + { + await _conn.CloseAsync(TimeSpan.Zero); + } + catch (Exception) + { + } + + await _conn.DisposeAsync(); + } } }