Skip to content

Commit 96fdab7

Browse files
committed
* Remove channels that do not recover successfully
1 parent d2c716b commit 96fdab7

File tree

4 files changed

+30
-9
lines changed

4 files changed

+30
-9
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public IAsyncBasicConsumer? DefaultConsumer
144144

145145
public string? CurrentQueue => InnerChannel.CurrentQueue;
146146

147-
internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
147+
internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
148148
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
149149
{
150150
if (false == recordedEntitiesSemaphoreHeld)
@@ -154,7 +154,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, boo
154154

155155
if (_disposed)
156156
{
157-
return;
157+
return false;
158158
}
159159

160160
_connection = conn;
@@ -198,6 +198,7 @@ await newChannel.TxSelectAsync(cancellationToken)
198198
{
199199
await newChannel.AbortAsync()
200200
.ConfigureAwait(false);
201+
return false;
201202
}
202203
else
203204
{
@@ -210,6 +211,8 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
210211
}
211212

212213
_innerChannel.RunRecoveryEventHandlers(this);
214+
215+
return true;
213216
}
214217
}
215218

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
4646
private readonly Dictionary<string, RecordedQueue> _recordedQueues = new Dictionary<string, RecordedQueue>();
4747
private readonly HashSet<RecordedBinding> _recordedBindings = new HashSet<RecordedBinding>();
4848
private readonly Dictionary<string, RecordedConsumer> _recordedConsumers = new Dictionary<string, RecordedConsumer>();
49-
private readonly List<AutorecoveringChannel> _channels = new List<AutorecoveringChannel>();
49+
private List<AutorecoveringChannel> _channels = new List<AutorecoveringChannel>();
5050

5151
internal int RecordedExchangesCount => _recordedExchanges.Count;
5252

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Linq;
3435
using System.Threading;
3536
using System.Threading.Tasks;
@@ -583,12 +584,29 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie
583584
throw new InvalidOperationException("recordedEntitiesSemaphore must be held");
584585
}
585586

586-
foreach (AutorecoveringChannel channel in _channels)
587+
var recoveredChannels = new List<AutorecoveringChannel>();
588+
await _channelsSemaphore.WaitAsync(cancellationToken)
589+
.ConfigureAwait(false);
590+
try
587591
{
588-
await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
589-
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld,
590-
cancellationToken: cancellationToken)
591-
.ConfigureAwait(false);
592+
foreach (AutorecoveringChannel channel in _channels)
593+
{
594+
bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
595+
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld,
596+
cancellationToken: cancellationToken)
597+
.ConfigureAwait(false);
598+
599+
if (recovered)
600+
{
601+
recoveredChannels.Add(channel);
602+
}
603+
}
604+
}
605+
finally
606+
{
607+
_channels.Clear();
608+
_channels = recoveredChannels;
609+
_channelsSemaphore.Release();
592610
}
593611
}
594612
}

projects/Test/Applications/GH-1647/GH-1647.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<ProjectReference Include="..\..\..\RabbitMQ.Client\RabbitMQ.Client.csproj" />
12+
<ProjectReference Include="../../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
1313
</ItemGroup>
1414

1515
</Project>

0 commit comments

Comments
 (0)