Skip to content

Commit bf7d538

Browse files
authored
Merge pull request #1316 from rosca-sabina/feature/658-add-filtering-to-topology-recovery-6.x
Add custom filtering and exception handling to topology recovery on 6.x
2 parents c351467 + a9cdaee commit bf7d538

16 files changed

+1051
-68
lines changed

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,17 @@ public TimeSpan ContinuationTimeout
271271
/// </summary>
272272
public bool TopologyRecoveryEnabled { get; set; } = true;
273273

274+
/// <summary>
275+
/// Filter to include/exclude entities from topology recovery.
276+
/// Default filter includes all entities in topology recovery.
277+
/// </summary>
278+
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();
279+
280+
/// <summary>
281+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
282+
/// </summary>
283+
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } = new TopologyRecoveryExceptionHandler();
284+
274285
/// <summary>
275286
/// Construct a fresh instance, with all fields set to their respective defaults.
276287
/// </summary>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedBinding
6+
{
7+
string Source { get; }
8+
9+
string Destination { get; }
10+
11+
string RoutingKey { get; }
12+
13+
IDictionary<string, object> Arguments { get; }
14+
}
15+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedConsumer
6+
{
7+
string ConsumerTag { get; }
8+
9+
string Queue { get; }
10+
11+
bool AutoAck { get; }
12+
13+
bool Exclusive { get; }
14+
15+
IDictionary<string, object> Arguments { get; }
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedExchange
6+
{
7+
string Name { get; }
8+
9+
string Type { get; }
10+
11+
bool Durable { get; }
12+
13+
bool AutoDelete { get; }
14+
15+
IDictionary<string, object> Arguments { get; }
16+
}
17+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedQueue
6+
{
7+
string Name { get; }
8+
9+
bool Durable { get; }
10+
11+
bool Exclusive { get; }
12+
13+
bool AutoDelete { get; }
14+
15+
IDictionary<string, object> Arguments { get; }
16+
17+
bool IsServerNamed { get; }
18+
}
19+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
7+
/// </summary>
8+
public class TopologyRecoveryExceptionHandler
9+
{
10+
private static readonly Func<IRecordedExchange, Exception, bool> s_defaultExchangeExceptionCondition = (e, ex) => true;
11+
private static readonly Func<IRecordedQueue, Exception, bool> s_defaultQueueExceptionCondition = (q, ex) => true;
12+
private static readonly Func<IRecordedBinding, Exception, bool> s_defaultBindingExceptionCondition = (b, ex) => true;
13+
private static readonly Func<IRecordedConsumer, Exception, bool> s_defaultConsumerExceptionCondition = (c, ex) => true;
14+
15+
private Func<IRecordedExchange, Exception, bool> _exchangeRecoveryExceptionCondition;
16+
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
17+
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
18+
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
19+
private Action<IRecordedExchange, Exception, IConnection> _exchangeRecoveryExceptionHandler;
20+
private Action<IRecordedQueue, Exception, IConnection> _queueRecoveryExceptionHandler;
21+
private Action<IRecordedBinding, Exception, IConnection> _bindingRecoveryExceptionHandler;
22+
private Action<IRecordedConsumer, Exception, IConnection> _consumerRecoveryExceptionHandler;
23+
24+
/// <summary>
25+
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
26+
/// Default condition applies the exception handler to all exchange recovery exceptions.
27+
/// </summary>
28+
public Func<IRecordedExchange, Exception, bool> ExchangeRecoveryExceptionCondition
29+
{
30+
get => _exchangeRecoveryExceptionCondition ?? s_defaultExchangeExceptionCondition;
31+
32+
set
33+
{
34+
if (_exchangeRecoveryExceptionCondition != null)
35+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
36+
37+
_exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
38+
}
39+
}
40+
41+
/// <summary>
42+
/// Decides which queue recovery exceptions the custom exception handler is applied to.
43+
/// Default condition applies the exception handler to all queue recovery exceptions.
44+
/// </summary>
45+
public Func<IRecordedQueue, Exception, bool> QueueRecoveryExceptionCondition
46+
{
47+
get => _queueRecoveryExceptionCondition ?? s_defaultQueueExceptionCondition;
48+
49+
set
50+
{
51+
if (_queueRecoveryExceptionCondition != null)
52+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized.");
53+
54+
_queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition));
55+
}
56+
}
57+
58+
/// <summary>
59+
/// Decides which binding recovery exceptions the custom exception handler is applied to.
60+
/// Default condition applies the exception handler to all binding recovery exceptions.
61+
/// </summary>
62+
public Func<IRecordedBinding, Exception, bool> BindingRecoveryExceptionCondition
63+
{
64+
get => _bindingRecoveryExceptionCondition ?? s_defaultBindingExceptionCondition;
65+
66+
set
67+
{
68+
if (_bindingRecoveryExceptionCondition != null)
69+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
70+
71+
_bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
72+
}
73+
}
74+
75+
/// <summary>
76+
/// Decides which consumer recovery exceptions the custom exception handler is applied to.
77+
/// Default condition applies the exception handler to all consumer recovery exceptions.
78+
/// </summary>
79+
public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionCondition
80+
{
81+
get => _consumerRecoveryExceptionCondition ?? s_defaultConsumerExceptionCondition;
82+
83+
set
84+
{
85+
if (_consumerRecoveryExceptionCondition != null)
86+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized.");
87+
88+
_consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition));
89+
}
90+
}
91+
92+
/// <summary>
93+
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
94+
/// </summary>
95+
public Action<IRecordedExchange, Exception, IConnection> ExchangeRecoveryExceptionHandler
96+
{
97+
get => _exchangeRecoveryExceptionHandler;
98+
99+
set
100+
{
101+
if (_exchangeRecoveryExceptionHandler != null)
102+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionHandler)} after it has been initialized.");
103+
104+
_exchangeRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionHandler));
105+
}
106+
}
107+
108+
/// <summary>
109+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
110+
/// </summary>
111+
public Action<IRecordedQueue, Exception, IConnection> QueueRecoveryExceptionHandler
112+
{
113+
get => _queueRecoveryExceptionHandler;
114+
115+
set
116+
{
117+
if (_queueRecoveryExceptionHandler != null)
118+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionHandler)} after it has been initialized.");
119+
120+
_queueRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionHandler));
121+
}
122+
}
123+
124+
/// <summary>
125+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
126+
/// </summary>
127+
public Action<IRecordedBinding, Exception, IConnection> BindingRecoveryExceptionHandler
128+
{
129+
get => _bindingRecoveryExceptionHandler;
130+
131+
set
132+
{
133+
if (_bindingRecoveryExceptionHandler != null)
134+
throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionHandler)} after it has been initialized.");
135+
136+
_bindingRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionHandler));
137+
}
138+
}
139+
140+
/// <summary>
141+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
142+
/// </summary>
143+
public Action<IRecordedConsumer, Exception, IConnection> ConsumerRecoveryExceptionHandler
144+
{
145+
get => _consumerRecoveryExceptionHandler;
146+
147+
set
148+
{
149+
if (_consumerRecoveryExceptionHandler != null)
150+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionHandler)} after it has been initialized.");
151+
152+
_consumerRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionHandler));
153+
}
154+
}
155+
}
156+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Filter to know which entities (exchanges, queues, bindings, consumers) should be recovered by topology recovery.
7+
/// By default, allows all entities to be recovered.
8+
/// </summary>
9+
public class TopologyRecoveryFilter
10+
{
11+
private static readonly Func<IRecordedExchange, bool> s_defaultExchangeFilter = exchange => true;
12+
private static readonly Func<IRecordedQueue, bool> s_defaultQueueFilter = queue => true;
13+
private static readonly Func<IRecordedBinding, bool> s_defaultBindingFilter = binding => true;
14+
private static readonly Func<IRecordedConsumer, bool> s_defaultConsumerFilter = consumer => true;
15+
16+
private Func<IRecordedExchange, bool> _exchangeFilter;
17+
private Func<IRecordedQueue, bool> _queueFilter;
18+
private Func<IRecordedBinding, bool> _bindingFilter;
19+
private Func<IRecordedConsumer, bool> _consumerFilter;
20+
21+
/// <summary>
22+
/// Decides whether an exchange is recovered or not.
23+
/// </summary>
24+
public Func<IRecordedExchange, bool> ExchangeFilter
25+
{
26+
get => _exchangeFilter ?? s_defaultExchangeFilter;
27+
28+
set
29+
{
30+
if (_exchangeFilter != null)
31+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeFilter)} after it has been initialized.");
32+
33+
_exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter));
34+
}
35+
}
36+
37+
/// <summary>
38+
/// Decides whether a queue is recovered or not.
39+
/// </summary>
40+
public Func<IRecordedQueue, bool> QueueFilter
41+
{
42+
get => _queueFilter ?? s_defaultQueueFilter;
43+
44+
set
45+
{
46+
if (_queueFilter != null)
47+
throw new InvalidOperationException($"Cannot modify {nameof(QueueFilter)} after it has been initialized.");
48+
49+
_queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter));
50+
}
51+
}
52+
53+
/// <summary>
54+
/// Decides whether a binding is recovered or not.
55+
/// </summary>
56+
public Func<IRecordedBinding, bool> BindingFilter
57+
{
58+
get => _bindingFilter ?? s_defaultBindingFilter;
59+
60+
set
61+
{
62+
if (_bindingFilter != null)
63+
throw new InvalidOperationException($"Cannot modify {nameof(BindingFilter)} after it has been initialized.");
64+
65+
_bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter));
66+
}
67+
}
68+
69+
/// <summary>
70+
/// Decides whether a consumer is recovered or not.
71+
/// </summary>
72+
public Func<IRecordedConsumer, bool> ConsumerFilter
73+
{
74+
get => _consumerFilter ?? s_defaultConsumerFilter;
75+
76+
set
77+
{
78+
if (_consumerFilter != null)
79+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerFilter)} after it has been initialized.");
80+
81+
_consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter));
82+
}
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)