From d2c716bd66b41a3baf4912cb22836d6face4055c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 31 Jul 2024 08:39:46 -0700 Subject: [PATCH 1/3] Fix object disposed exception during channel Recovery Fixes #1647 --- RabbitMQDotNetClient.sln | 9 +++++- .../client/impl/AutorecoveringChannel.cs | 23 +++++++++++---- .../Test/Applications/GH-1647/GH-1647.csproj | 15 ++++++++++ projects/Test/Applications/GH-1647/Program.cs | 29 +++++++++++++++++++ 4 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 projects/Test/Applications/GH-1647/GH-1647.csproj create mode 100644 projects/Test/Applications/GH-1647/Program.cs diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 8974d4a9ce..5354b8d079 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -40,7 +40,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -96,6 +98,10 @@ Global {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -110,6 +116,7 @@ Global {F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 5dd3047fb5..9a8f5549fd 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -152,7 +152,11 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, boo throw new InvalidOperationException("recordedEntitiesSemaphore must be held"); } - ThrowIfDisposed(); + if (_disposed) + { + return; + } + _connection = conn; RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken) @@ -189,15 +193,24 @@ await newChannel.TxSelectAsync(cancellationToken) * chance that an invalid Channel will be used to handle a basic.deliver frame, * with the resulting basic.ack never getting sent out. */ - _innerChannel = newChannel; - if (recoverConsumers) + if (_disposed) { - await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld) + await newChannel.AbortAsync() .ConfigureAwait(false); } + else + { + _innerChannel = newChannel; - _innerChannel.RunRecoveryEventHandlers(this); + if (recoverConsumers) + { + await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld) + .ConfigureAwait(false); + } + + _innerChannel.RunRecoveryEventHandlers(this); + } } public async Task CloseAsync(ushort replyCode, string replyText, bool abort, diff --git a/projects/Test/Applications/GH-1647/GH-1647.csproj b/projects/Test/Applications/GH-1647/GH-1647.csproj new file mode 100644 index 0000000000..d17f9790e2 --- /dev/null +++ b/projects/Test/Applications/GH-1647/GH-1647.csproj @@ -0,0 +1,15 @@ + + + + Exe + net6.0 + GH_1647 + enable + enable + + + + + + + diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs new file mode 100644 index 0000000000..f3cccb3da3 --- /dev/null +++ b/projects/Test/Applications/GH-1647/Program.cs @@ -0,0 +1,29 @@ +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task +using System.Text; +using RabbitMQ.Client; + +ConnectionFactory connectionFactory = new() +{ + AutomaticRecoveryEnabled = true, + UserName = "guest", + Password = "guest" +}; + +var props = new BasicProperties(); +byte[] msg = Encoding.UTF8.GetBytes("test"); +using var connection = await connectionFactory.CreateConnectionAsync(); +for (int i = 0; i < 300; i++) +{ + try + { + using var channel = await connection.CreateChannelAsync(); // New channel for each message + await Task.Delay(1000); + await channel.BasicPublishAsync(string.Empty, string.Empty, props, msg); + Console.WriteLine($"Sent message {i}"); + } + catch (Exception ex) + { + Console.WriteLine($"Failed to send message {i}: {ex.Message}"); + await Task.Delay(1000); + } +} From 96fdab716ec49498623a5c1959c27f734b60b18e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 31 Jul 2024 08:52:22 -0700 Subject: [PATCH 2/3] * Remove channels that do not recover successfully --- .../client/impl/AutorecoveringChannel.cs | 7 +++-- .../AutorecoveringConnection.Recording.cs | 2 +- .../impl/AutorecoveringConnection.Recovery.cs | 28 +++++++++++++++---- .../Test/Applications/GH-1647/GH-1647.csproj | 2 +- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 9a8f5549fd..3640d461e9 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -144,7 +144,7 @@ public IAsyncBasicConsumer? DefaultConsumer public string? CurrentQueue => InnerChannel.CurrentQueue; - internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers, + internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (false == recordedEntitiesSemaphoreHeld) @@ -154,7 +154,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, boo if (_disposed) { - return; + return false; } _connection = conn; @@ -198,6 +198,7 @@ await newChannel.TxSelectAsync(cancellationToken) { await newChannel.AbortAsync() .ConfigureAwait(false); + return false; } else { @@ -210,6 +211,8 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph } _innerChannel.RunRecoveryEventHandlers(this); + + return true; } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index db5dffc909..af5cc55d9c 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection private readonly Dictionary _recordedQueues = new Dictionary(); private readonly HashSet _recordedBindings = new HashSet(); private readonly Dictionary _recordedConsumers = new Dictionary(); - private readonly List _channels = new List(); + private List _channels = new List(); internal int RecordedExchangesCount => _recordedExchanges.Count; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index e1d47ba877..367749951f 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -583,12 +584,29 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie throw new InvalidOperationException("recordedEntitiesSemaphore must be held"); } - foreach (AutorecoveringChannel channel in _channels) + var recoveredChannels = new List(); + await _channelsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + try { - await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, - recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, - cancellationToken: cancellationToken) - .ConfigureAwait(false); + foreach (AutorecoveringChannel channel in _channels) + { + bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, + recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (recovered) + { + recoveredChannels.Add(channel); + } + } + } + finally + { + _channels.Clear(); + _channels = recoveredChannels; + _channelsSemaphore.Release(); } } } diff --git a/projects/Test/Applications/GH-1647/GH-1647.csproj b/projects/Test/Applications/GH-1647/GH-1647.csproj index d17f9790e2..022441c0e1 100644 --- a/projects/Test/Applications/GH-1647/GH-1647.csproj +++ b/projects/Test/Applications/GH-1647/GH-1647.csproj @@ -9,7 +9,7 @@ - + From 4c58542d42d78268175dd2df9c9625a12160975e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 31 Jul 2024 09:24:48 -0700 Subject: [PATCH 3/3] * Hold the `_channelsSemaphore` for a shorter period of time. --- .../AutorecoveringConnection.Recording.cs | 2 +- .../impl/AutorecoveringConnection.Recovery.cs | 39 +++++++++++++------ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index af5cc55d9c..db5dffc909 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection private readonly Dictionary _recordedQueues = new Dictionary(); private readonly HashSet _recordedBindings = new HashSet(); private readonly Dictionary _recordedConsumers = new Dictionary(); - private List _channels = new List(); + private readonly List _channels = new List(); internal int RecordedExchangesCount => _recordedExchanges.Count; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 367749951f..4d48463630 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -584,28 +584,43 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie throw new InvalidOperationException("recordedEntitiesSemaphore must be held"); } - var recoveredChannels = new List(); + var channelsToRecover = new List(); await _channelsSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - foreach (AutorecoveringChannel channel in _channels) + channelsToRecover.AddRange(_channels); + } + finally + { + _channelsSemaphore.Release(); + } + + var notRecoveredChannels = new List(); + foreach (AutorecoveringChannel channel in channelsToRecover) + { + bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, + recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (false == recovered) { - bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, - recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, - cancellationToken: cancellationToken) - .ConfigureAwait(false); + notRecoveredChannels.Add(channel); + } + } - if (recovered) - { - recoveredChannels.Add(channel); - } + await _channelsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + try + { + foreach (AutorecoveringChannel channel in notRecoveredChannels) + { + _channels.Remove(channel); } } finally { - _channels.Clear(); - _channels = recoveredChannels; _channelsSemaphore.Release(); } }