Skip to content

Commit d1f35ca

Browse files
committed
Added topology recovery filter
1 parent d457b01 commit d1f35ca

13 files changed

+179
-17
lines changed

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public sealed class ConnectionConfig
8686
/// </summary>
8787
public bool TopologyRecoveryEnabled { get; }
8888

89+
/// <summary>
90+
/// Filter to include/exclude entities from topology recovery.
91+
/// Default filter includes all entities in topology recovery.
92+
/// </summary>
93+
public TopologyRecoveryFilter TopologyRecoveryFilter { get; }
94+
8995
/// <summary>
9096
/// Amount of time client will wait for before re-trying to recover connection.
9197
/// </summary>
@@ -127,7 +133,7 @@ public sealed class ConnectionConfig
127133

128134
internal ConnectionConfig(string virtualHost, string userName, string password, IList<IAuthMechanismFactory> authMechanisms,
129135
IDictionary<string, object?> clientProperties, string? clientProvidedName,
130-
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled,
136+
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled, TopologyRecoveryFilter topologyRecoveryFilter,
131137
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
132138
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
133139
Func<AmqpTcpEndpoint, IFrameHandler> frameHandlerFactory)
@@ -141,6 +147,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
141147
MaxChannelCount = maxChannelCount;
142148
MaxFrameSize = maxFrameSize;
143149
TopologyRecoveryEnabled = topologyRecoveryEnabled;
150+
TopologyRecoveryFilter = topologyRecoveryFilter;
144151
NetworkRecoveryInterval = networkRecoveryInterval;
145152
HeartbeatInterval = heartbeatInterval;
146153
ContinuationTimeout = continuationTimeout;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,12 @@ public TimeSpan ContinuationTimeout
259259
/// </summary>
260260
public bool TopologyRecoveryEnabled { get; set; } = true;
261261

262+
/// <summary>
263+
/// Filter to include/exclude entities from topology recovery.
264+
/// Default filter includes all entities in topology recovery.
265+
/// </summary>
266+
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();
267+
262268
/// <summary>
263269
/// Construct a fresh instance, with all fields set to their respective defaults.
264270
/// </summary>
@@ -535,6 +541,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
535541
RequestedChannelMax,
536542
RequestedFrameMax,
537543
TopologyRecoveryEnabled,
544+
TopologyRecoveryFilter,
538545
NetworkRecoveryInterval,
539546
RequestedHeartbeat,
540547
ContinuationTimeout,
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedBinding
7+
{
8+
string Source { get; }
9+
10+
string Destination { get; }
11+
12+
string RoutingKey { get; }
13+
14+
IDictionary<string, object>? Arguments { get; }
15+
}
16+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedConsumer
7+
{
8+
string ConsumerTag { get; }
9+
10+
string Queue { get; }
11+
12+
bool AutoAck { get; }
13+
14+
bool Exclusive { get; }
15+
16+
IDictionary<string, object>? Arguments { get; }
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedExchange
7+
{
8+
string Name { get; }
9+
10+
string Type { get; }
11+
12+
bool Durable { get; }
13+
14+
bool AutoDelete { get; }
15+
16+
IDictionary<string, object>? Arguments { get; }
17+
}
18+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedQueue
7+
{
8+
string Name { get; }
9+
10+
bool Durable { get; }
11+
12+
bool Exclusive { get; }
13+
14+
bool AutoDelete { get; }
15+
16+
IDictionary<string, object>? Arguments { get; }
17+
18+
bool IsServerNamed { get; }
19+
}
20+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Filter to know which entities (exchanges, queues, bindings, consumers) should be recovered by topology recovery.
7+
/// By default, allows all entities to be recovered.
8+
/// </summary>
9+
public class TopologyRecoveryFilter
10+
{
11+
private Func<IRecordedExchange, bool> _exchangeFilter = exchange => true;
12+
private Func<IRecordedQueue, bool> _queueFilter = queue => true;
13+
private Func<IRecordedBinding, bool> _bindingFilter = binding => true;
14+
private Func<IRecordedConsumer, bool> _consumerFilter = consumer => true;
15+
16+
/// <summary>
17+
/// Decides whether an exchange is recovered or not.
18+
/// </summary>
19+
public Func<IRecordedExchange, bool> ExchangeFilter
20+
{
21+
get => _exchangeFilter;
22+
23+
init
24+
{
25+
_exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter));
26+
}
27+
}
28+
29+
/// <summary>
30+
/// Decides whether a queue is recovered or not.
31+
/// </summary>
32+
public Func<IRecordedQueue, bool> QueueFilter
33+
{
34+
get => _queueFilter;
35+
36+
init
37+
{
38+
_queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter));
39+
}
40+
}
41+
42+
/// <summary>
43+
/// Decides whether a binding is recovered or not.
44+
/// </summary>
45+
public Func<IRecordedBinding, bool> BindingFilter
46+
{
47+
get => _bindingFilter;
48+
49+
init
50+
{
51+
_bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter));
52+
}
53+
}
54+
55+
/// <summary>
56+
/// Decides whether a consumer is recovered or not.
57+
/// </summary>
58+
public Func<IRecordedConsumer, bool> ConsumerFilter
59+
{
60+
get => _consumerFilter;
61+
62+
init
63+
{
64+
_consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter));
65+
}
66+
}
67+
}
68+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ internal void DeleteAutoDeleteExchange(string exchangeName)
7777
{
7878
lock (_recordedEntitiesLock)
7979
{
80-
if (_recordedExchanges.TryGetValue(exchangeName, out var recordedExchange) && recordedExchange.IsAutoDelete)
80+
if (_recordedExchanges.TryGetValue(exchangeName, out var recordedExchange) && recordedExchange.AutoDelete)
8181
{
8282
if (!AnyBindingsOnExchange(exchangeName))
8383
{
@@ -204,7 +204,7 @@ internal void DeleteRecordedConsumer(string consumerTag)
204204

205205
void DeleteAutoDeleteQueue(string queue)
206206
{
207-
if (_recordedQueues.TryGetValue(queue, out var recordedQueue) && recordedQueue.IsAutoDelete)
207+
if (_recordedQueues.TryGetValue(queue, out var recordedQueue) && recordedQueue.AutoDelete)
208208
{
209209
// last consumer on this connection is gone, remove recorded queue if it is auto-deleted.
210210
if (!AnyConsumersOnQueue(queue))

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private bool TryRecoverConnectionDelegate()
213213

214214
private void RecoverExchanges(IChannel channel)
215215
{
216-
foreach (var recordedExchange in _recordedExchanges.Values)
216+
foreach (var recordedExchange in _recordedExchanges.Values.Where(x => _config.TopologyRecoveryFilter?.ExchangeFilter(x) ?? true))
217217
{
218218
try
219219
{
@@ -228,7 +228,7 @@ private void RecoverExchanges(IChannel channel)
228228

229229
private void RecoverQueues(IChannel channel)
230230
{
231-
foreach (var recordedQueue in _recordedQueues.Values.ToArray())
231+
foreach (var recordedQueue in _recordedQueues.Values.Where(x => _config.TopologyRecoveryFilter?.QueueFilter(x) ?? true).ToArray())
232232
{
233233
try
234234
{
@@ -266,7 +266,7 @@ private void RecoverQueues(IChannel channel)
266266

267267
private void RecoverBindings(IChannel channel)
268268
{
269-
foreach (var binding in _recordedBindings)
269+
foreach (var binding in _recordedBindings.Where(x => _config.TopologyRecoveryFilter?.BindingFilter(x) ?? true))
270270
{
271271
try
272272
{
@@ -281,7 +281,7 @@ private void RecoverBindings(IChannel channel)
281281

282282
internal void RecoverConsumers(AutorecoveringChannel channelToRecover, IChannel channelToUse)
283283
{
284-
foreach (var consumer in _recordedConsumers.Values.ToArray())
284+
foreach (var consumer in _recordedConsumers.Values.Where(x => _config.TopologyRecoveryFilter?.ConsumerFilter(x) ?? true).ToArray())
285285
{
286286
if (consumer.Channel != channelToRecover)
287287
{

projects/RabbitMQ.Client/client/impl/RecordedBinding.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
namespace RabbitMQ.Client.Impl
3636
{
3737
#nullable enable
38-
internal readonly struct RecordedBinding : IEquatable<RecordedBinding>
38+
internal readonly struct RecordedBinding : IEquatable<RecordedBinding>, IRecordedBinding
3939
{
4040
private readonly bool _isQueueBinding;
4141
private readonly string _destination;
@@ -45,6 +45,8 @@ namespace RabbitMQ.Client.Impl
4545

4646
public string Destination => _destination;
4747
public string Source => _source;
48+
public string RoutingKey => _routingKey;
49+
public IDictionary<string, object>? Arguments => _arguments;
4850

4951
public RecordedBinding(bool isQueueBinding, string destination, string source, string routingKey, IDictionary<string, object>? arguments)
5052
{

0 commit comments

Comments
 (0)