Skip to content

Commit 92b5221

Browse files
rosca-sabinaSabina Rosca
authored andcommitted
Added custom exception handlers to topology recovery
1 parent d42511e commit 92b5221

File tree

3 files changed

+256
-22
lines changed

3 files changed

+256
-22
lines changed

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ public TimeSpan ContinuationTimeout
277277
/// </summary>
278278
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();
279279

280+
/// <summary>
281+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
282+
/// </summary>
283+
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } = new TopologyRecoveryExceptionHandler();
284+
280285
/// <summary>
281286
/// Construct a fresh instance, with all fields set to their respective defaults.
282287
/// </summary>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
7+
/// </summary>
8+
public class TopologyRecoveryExceptionHandler
9+
{
10+
private static readonly Func<IRecordedExchange, Exception, bool> s_defaultExchangeExceptionCondition = (e, ex) => true;
11+
private static readonly Func<IRecordedQueue, Exception, bool> s_defaultQueueExceptionCondition = (q, ex) => true;
12+
private static readonly Func<IRecordedBinding, Exception, bool> s_defaultBindingExceptionCondition = (b, ex) => true;
13+
private static readonly Func<IRecordedConsumer, Exception, bool> s_defaultConsumerExceptionCondition = (c, ex) => true;
14+
15+
private Func<IRecordedExchange, Exception, bool> _exchangeRecoveryExceptionCondition;
16+
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
17+
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
18+
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
19+
private Action<IRecordedExchange, Exception> _exchangeRecoveryExceptionHandler;
20+
private Action<IRecordedQueue, Exception> _queueRecoveryExceptionHandler;
21+
private Action<IRecordedBinding, Exception> _bindingRecoveryExceptionHandler;
22+
private Action<IRecordedConsumer, Exception> _consumerRecoveryExceptionHandler;
23+
24+
/// <summary>
25+
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
26+
/// Default condition applies the exception handler to all exchange recovery exceptions.
27+
/// </summary>
28+
public Func<IRecordedExchange, Exception, bool> ExchangeRecoveryExceptionCondition
29+
{
30+
get => _exchangeRecoveryExceptionCondition ?? s_defaultExchangeExceptionCondition;
31+
32+
set
33+
{
34+
if (_exchangeRecoveryExceptionCondition != null)
35+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
36+
37+
_exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
38+
}
39+
}
40+
41+
/// <summary>
42+
/// Decides which queue recovery exceptions the custom exception handler is applied to.
43+
/// Default condition applies the exception handler to all queue recovery exceptions.
44+
/// </summary>
45+
public Func<IRecordedQueue, Exception, bool> QueueRecoveryExceptionCondition
46+
{
47+
get => _queueRecoveryExceptionCondition ?? s_defaultQueueExceptionCondition;
48+
49+
set
50+
{
51+
if (_queueRecoveryExceptionCondition != null)
52+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized.");
53+
54+
_queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition));
55+
}
56+
}
57+
58+
/// <summary>
59+
/// Decides which binding recovery exceptions the custom exception handler is applied to.
60+
/// Default condition applies the exception handler to all binding recovery exceptions.
61+
/// </summary>
62+
public Func<IRecordedBinding, Exception, bool> BindingRecoveryExceptionCondition
63+
{
64+
get => _bindingRecoveryExceptionCondition ?? s_defaultBindingExceptionCondition;
65+
66+
set
67+
{
68+
if (_bindingRecoveryExceptionCondition != null)
69+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
70+
71+
_bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
72+
}
73+
}
74+
75+
/// <summary>
76+
/// Decides which consumer recovery exceptions the custom exception handler is applied to.
77+
/// Default condition applies the exception handler to all consumer recovery exceptions.
78+
/// </summary>
79+
public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionCondition
80+
{
81+
get => _consumerRecoveryExceptionCondition ?? s_defaultConsumerExceptionCondition;
82+
83+
set
84+
{
85+
if (_consumerRecoveryExceptionCondition != null)
86+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized.");
87+
88+
_consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition));
89+
}
90+
}
91+
92+
/// <summary>
93+
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
94+
/// </summary>
95+
public Action<IRecordedExchange, Exception> ExchangeRecoveryExceptionHandler
96+
{
97+
get => _exchangeRecoveryExceptionHandler;
98+
99+
set
100+
{
101+
if (_exchangeRecoveryExceptionHandler != null)
102+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionHandler)} after it has been initialized.");
103+
104+
_exchangeRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionHandler));
105+
}
106+
}
107+
108+
/// <summary>
109+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
110+
/// </summary>
111+
public Action<IRecordedQueue, Exception> QueueRecoveryExceptionHandler
112+
{
113+
get => _queueRecoveryExceptionHandler;
114+
115+
set
116+
{
117+
if (_queueRecoveryExceptionHandler != null)
118+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionHandler)} after it has been initialized.");
119+
120+
_queueRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionHandler));
121+
}
122+
}
123+
124+
/// <summary>
125+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
126+
/// </summary>
127+
public Action<IRecordedBinding, Exception> BindingRecoveryExceptionHandler
128+
{
129+
get => _bindingRecoveryExceptionHandler;
130+
131+
set
132+
{
133+
if (_bindingRecoveryExceptionHandler != null)
134+
throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionHandler)} after it has been initialized.");
135+
136+
_bindingRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionHandler));
137+
}
138+
}
139+
140+
/// <summary>
141+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
142+
/// Is only called when the exception did not cause the consumer's channel to close.
143+
/// </summary>
144+
public Action<IRecordedConsumer, Exception> ConsumerRecoveryExceptionHandler
145+
{
146+
get => _consumerRecoveryExceptionHandler;
147+
148+
set
149+
{
150+
if (_consumerRecoveryExceptionHandler != null)
151+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionHandler)} after it has been initialized.");
152+
153+
_consumerRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionHandler));
154+
}
155+
}
156+
}
157+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 94 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,11 @@ private bool TryPerformAutomaticRecovery()
428428
// 2. Recover queues
429429
// 3. Recover bindings
430430
// 4. Recover consumers
431-
using (var recoveryModel = _delegate.CreateModel())
431+
using (var recoveryModelFactory = new RecoveryModelFactory(_delegate))
432432
{
433-
RecoverExchanges(recoveryModel);
434-
RecoverQueues(recoveryModel);
435-
RecoverBindings(recoveryModel);
433+
RecoverExchanges(recoveryModelFactory);
434+
RecoverQueues(recoveryModelFactory);
435+
RecoverBindings(recoveryModelFactory);
436436
}
437437
}
438438

@@ -985,7 +985,7 @@ private void PropagateQueueNameChangeToConsumers(string oldName, string newName)
985985
}
986986
}
987987

988-
private void RecoverBindings(IModel model)
988+
private void RecoverBindings(RecoveryModelFactory recoveryModelFactory)
989989
{
990990
Dictionary<RecordedBinding, byte> recordedBindingsCopy;
991991
lock (_recordedEntitiesLock)
@@ -997,13 +997,21 @@ private void RecoverBindings(IModel model)
997997
{
998998
try
999999
{
1000-
b.Recover(model);
1000+
b.Recover(recoveryModelFactory.RecoveryModel);
10011001
}
10021002
catch (Exception cause)
10031003
{
1004-
string s = string.Format("Caught an exception while recovering binding between {0} and {1}: {2}",
1005-
b.Source, b.Destination, cause.Message);
1006-
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1004+
if (_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler != null
1005+
&& _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionCondition(b, cause))
1006+
{
1007+
_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(b, cause);
1008+
}
1009+
else
1010+
{
1011+
string s = string.Format("Caught an exception while recovering binding between {0} and {1}: {2}",
1012+
b.Source, b.Destination, cause.Message);
1013+
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1014+
}
10071015
}
10081016
}
10091017
}
@@ -1139,14 +1147,23 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe
11391147
}
11401148
catch (Exception cause)
11411149
{
1142-
string s = string.Format("Caught an exception while recovering consumer {0} on queue {1}: {2}",
1143-
tag, cons.Queue, cause.Message);
1144-
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1150+
if (channelToUse.IsOpen
1151+
&& _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null
1152+
&& _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionCondition(cons, cause))
1153+
{
1154+
_factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(cons, cause);
1155+
}
1156+
else
1157+
{
1158+
string s = string.Format("Caught an exception while recovering consumer {0} on queue {1}: {2}",
1159+
tag, cons.Queue, cause.Message);
1160+
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1161+
}
11451162
}
11461163
}
11471164
}
11481165

1149-
private void RecoverExchanges(IModel model)
1166+
private void RecoverExchanges(RecoveryModelFactory recoveryModelFactory)
11501167
{
11511168
Dictionary<string, RecordedExchange> recordedExchangesCopy;
11521169
lock (_recordedEntitiesLock)
@@ -1158,13 +1175,21 @@ private void RecoverExchanges(IModel model)
11581175
{
11591176
try
11601177
{
1161-
rx.Recover(model);
1178+
rx.Recover(recoveryModelFactory.RecoveryModel);
11621179
}
11631180
catch (Exception cause)
11641181
{
1165-
string s = string.Format("Caught an exception while recovering exchange {0}: {1}",
1166-
rx.Name, cause.Message);
1167-
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1182+
if (_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler != null
1183+
&& _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionCondition(rx, cause))
1184+
{
1185+
_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(rx, cause);
1186+
}
1187+
else
1188+
{
1189+
string s = string.Format("Caught an exception while recovering exchange {0}: {1}",
1190+
rx.Name, cause.Message);
1191+
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1192+
}
11681193
}
11691194
}
11701195
}
@@ -1180,7 +1205,7 @@ private void RecoverModelsAndItsConsumers()
11801205
}
11811206
}
11821207

1183-
private void RecoverQueues(IModel model)
1208+
private void RecoverQueues(RecoveryModelFactory recoveryModelFactory)
11841209
{
11851210
Dictionary<string, RecordedQueue> recordedQueuesCopy;
11861211
lock (_recordedEntitiesLock)
@@ -1195,7 +1220,7 @@ private void RecoverQueues(IModel model)
11951220

11961221
try
11971222
{
1198-
rq.Recover(model);
1223+
rq.Recover(recoveryModelFactory.RecoveryModel);
11991224
string newName = rq.Name;
12001225

12011226
if (!oldName.Equals(newName))
@@ -1232,9 +1257,17 @@ private void RecoverQueues(IModel model)
12321257
}
12331258
catch (Exception cause)
12341259
{
1235-
string s = string.Format("Caught an exception while recovering queue {0}: {1}",
1236-
oldName, cause.Message);
1237-
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1260+
if (_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler != null
1261+
&& _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionCondition(rq, cause))
1262+
{
1263+
_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(rq, cause);
1264+
}
1265+
else
1266+
{
1267+
string s = string.Format("Caught an exception while recovering queue {0}: {1}",
1268+
oldName, cause.Message);
1269+
HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause));
1270+
}
12381271
}
12391272
}
12401273
}
@@ -1295,6 +1328,45 @@ private enum RecoveryConnectionState
12951328
Recovering
12961329
}
12971330

1331+
private sealed class RecoveryModelFactory : IDisposable
1332+
{
1333+
private readonly IConnection _connection;
1334+
private IModel _recoveryModel;
1335+
1336+
public RecoveryModelFactory(IConnection connection)
1337+
{
1338+
_connection = connection;
1339+
}
1340+
1341+
public IModel RecoveryModel
1342+
{
1343+
get
1344+
{
1345+
if (_recoveryModel == null)
1346+
{
1347+
_recoveryModel = _connection.CreateModel();
1348+
}
1349+
1350+
if (_recoveryModel.IsClosed)
1351+
{
1352+
_recoveryModel.Dispose();
1353+
_recoveryModel = _connection.CreateModel();
1354+
}
1355+
1356+
return _recoveryModel;
1357+
}
1358+
}
1359+
1360+
public void Dispose()
1361+
{
1362+
if (_recoveryModel != null)
1363+
{
1364+
_recoveryModel.Close();
1365+
_recoveryModel.Dispose();
1366+
}
1367+
}
1368+
}
1369+
12981370
private Task _recoveryTask;
12991371
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;
13001372

0 commit comments

Comments
 (0)