Skip to content

Commit c65192c

Browse files
rosca-sabinaSabina Rosca
authored andcommitted
Added tests for topology recovery exception handler
1 parent 92b5221 commit c65192c

File tree

5 files changed

+236
-20
lines changed

5 files changed

+236
-20
lines changed

projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ namespace RabbitMQ.Client
44
{
55
public interface IRecordedConsumer
66
{
7+
IBasicConsumer Consumer { get; }
8+
79
string ConsumerTag { get; }
810

911
string Queue { get; }

projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ public class TopologyRecoveryExceptionHandler
1616
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
1717
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
1818
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
19-
private Action<IRecordedExchange, Exception> _exchangeRecoveryExceptionHandler;
20-
private Action<IRecordedQueue, Exception> _queueRecoveryExceptionHandler;
21-
private Action<IRecordedBinding, Exception> _bindingRecoveryExceptionHandler;
22-
private Action<IRecordedConsumer, Exception> _consumerRecoveryExceptionHandler;
19+
private Action<IRecordedExchange, Exception, IConnection> _exchangeRecoveryExceptionHandler;
20+
private Action<IRecordedQueue, Exception, IConnection> _queueRecoveryExceptionHandler;
21+
private Action<IRecordedBinding, Exception, IConnection> _bindingRecoveryExceptionHandler;
22+
private Action<IRecordedConsumer, Exception, IConnection> _consumerRecoveryExceptionHandler;
2323

2424
/// <summary>
2525
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
@@ -92,7 +92,7 @@ public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionConditi
9292
/// <summary>
9393
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
9494
/// </summary>
95-
public Action<IRecordedExchange, Exception> ExchangeRecoveryExceptionHandler
95+
public Action<IRecordedExchange, Exception, IConnection> ExchangeRecoveryExceptionHandler
9696
{
9797
get => _exchangeRecoveryExceptionHandler;
9898

@@ -108,7 +108,7 @@ public Action<IRecordedExchange, Exception> ExchangeRecoveryExceptionHandler
108108
/// <summary>
109109
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
110110
/// </summary>
111-
public Action<IRecordedQueue, Exception> QueueRecoveryExceptionHandler
111+
public Action<IRecordedQueue, Exception, IConnection> QueueRecoveryExceptionHandler
112112
{
113113
get => _queueRecoveryExceptionHandler;
114114

@@ -124,7 +124,7 @@ public Action<IRecordedQueue, Exception> QueueRecoveryExceptionHandler
124124
/// <summary>
125125
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
126126
/// </summary>
127-
public Action<IRecordedBinding, Exception> BindingRecoveryExceptionHandler
127+
public Action<IRecordedBinding, Exception, IConnection> BindingRecoveryExceptionHandler
128128
{
129129
get => _bindingRecoveryExceptionHandler;
130130

@@ -141,7 +141,7 @@ public Action<IRecordedBinding, Exception> BindingRecoveryExceptionHandler
141141
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
142142
/// Is only called when the exception did not cause the consumer's channel to close.
143143
/// </summary>
144-
public Action<IRecordedConsumer, Exception> ConsumerRecoveryExceptionHandler
144+
public Action<IRecordedConsumer, Exception, IConnection> ConsumerRecoveryExceptionHandler
145145
{
146146
get => _consumerRecoveryExceptionHandler;
147147

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ private void RecoverBindings(RecoveryModelFactory recoveryModelFactory)
10041004
if (_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler != null
10051005
&& _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionCondition(b, cause))
10061006
{
1007-
_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(b, cause);
1007+
_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(b, cause, this);
10081008
}
10091009
else
10101010
{
@@ -1147,11 +1147,10 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe
11471147
}
11481148
catch (Exception cause)
11491149
{
1150-
if (channelToUse.IsOpen
1151-
&& _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null
1150+
if (_factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null
11521151
&& _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionCondition(cons, cause))
11531152
{
1154-
_factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(cons, cause);
1153+
_factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(cons, cause, this);
11551154
}
11561155
else
11571156
{
@@ -1182,7 +1181,7 @@ private void RecoverExchanges(RecoveryModelFactory recoveryModelFactory)
11821181
if (_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler != null
11831182
&& _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionCondition(rx, cause))
11841183
{
1185-
_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(rx, cause);
1184+
_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(rx, cause, this);
11861185
}
11871186
else
11881187
{
@@ -1260,7 +1259,7 @@ private void RecoverQueues(RecoveryModelFactory recoveryModelFactory)
12601259
if (_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler != null
12611260
&& _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionCondition(rq, cause))
12621261
{
1263-
_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(rq, cause);
1262+
_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(rq, cause, this);
12641263
}
12651264
else
12661265
{

projects/Unit/Fixtures.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco
163163
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
164164
}
165165

166+
internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(TopologyRecoveryExceptionHandler handler)
167+
{
168+
var cf = new ConnectionFactory
169+
{
170+
AutomaticRecoveryEnabled = true,
171+
TopologyRecoveryEnabled = true,
172+
TopologyRecoveryExceptionHandler = handler
173+
};
174+
175+
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
176+
}
177+
166178
internal IConnection CreateNonRecoveringConnection()
167179
{
168180
var cf = new ConnectionFactory

projects/Unit/TestConnectionRecovery.cs

Lines changed: 209 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Collections;
3433
using System.Collections.Generic;
34+
using System.ServiceModel.Channels;
3535
using System.Text;
3636
using System.Threading;
3737

@@ -1228,8 +1228,8 @@ public void TestTopologyRecoveryConsumerFilter()
12281228
var exchange = "topology.recovery.exchange";
12291229
var queueWithRecoveredConsumer = "topology.recovery.queue.1";
12301230
var queueWithIgnoredConsumer = "topology.recovery.queue.2";
1231-
var binding1 = "recovered.binding";
1232-
var binding2 = "filtered.binding";
1231+
var binding1 = "recovered.binding.1";
1232+
var binding2 = "recovered.binding.2";
12331233

12341234
ch.ExchangeDeclare(exchange, "direct");
12351235
ch.QueueDeclare(queueWithRecoveredConsumer, false, false, false, null);
@@ -1325,6 +1325,209 @@ public void TestTopologyRecoveryDefaultFilterRecoversAllEntities()
13251325
Assert.IsTrue(consumerLatch2.Wait(TimeSpan.FromSeconds(5)));
13261326
}
13271327

1328+
[Test]
1329+
public void TestTopologyRecoveryQueueExceptionHandler()
1330+
{
1331+
var changedQueueArguments = new Dictionary<string, object>
1332+
{
1333+
{ Headers.XMaxPriority, 20 }
1334+
};
1335+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1336+
{
1337+
QueueRecoveryExceptionCondition = (rq, ex) =>
1338+
{
1339+
return rq.Name.Contains("exception")
1340+
&& ex is OperationInterruptedException operationInterruptedException
1341+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.PreconditionFailed;
1342+
},
1343+
QueueRecoveryExceptionHandler = (rq, ex, connection) =>
1344+
{
1345+
using (var model = connection.CreateModel())
1346+
{
1347+
model.QueueDeclare(rq.Name, false, false, false, changedQueueArguments);
1348+
}
1349+
}
1350+
};
1351+
var latch = new ManualResetEventSlim(false);
1352+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1353+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1354+
IModel ch = conn.CreateModel();
1355+
1356+
var queueToRecoverWithException = "recovery.exception.queue";
1357+
var queueToRecoverSuccessfully = "successfully.recovered.queue";
1358+
ch.QueueDeclare(queueToRecoverWithException, false, false, false, null);
1359+
ch.QueueDeclare(queueToRecoverSuccessfully, false, false, false, null);
1360+
1361+
Model.QueueDelete(queueToRecoverSuccessfully);
1362+
Model.QueueDelete(queueToRecoverWithException);
1363+
Model.QueueDeclare(queueToRecoverWithException, false, false, false, changedQueueArguments);
1364+
1365+
CloseAndWaitForRecovery(conn);
1366+
Wait(latch);
1367+
1368+
Assert.IsTrue(ch.IsOpen);
1369+
AssertQueueRecovery(ch, queueToRecoverSuccessfully, false);
1370+
AssertQueueRecovery(ch, queueToRecoverWithException, false, changedQueueArguments);
1371+
1372+
//Cleanup
1373+
Model.QueueDelete(queueToRecoverWithException);
1374+
}
1375+
1376+
[Test]
1377+
public void TestTopologyRecoveryExchangeExceptionHandler()
1378+
{
1379+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1380+
{
1381+
ExchangeRecoveryExceptionCondition = (re, ex) =>
1382+
{
1383+
return re.Name.Contains("exception")
1384+
&& ex is OperationInterruptedException operationInterruptedException
1385+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.PreconditionFailed;
1386+
},
1387+
ExchangeRecoveryExceptionHandler = (re, ex, connection) =>
1388+
{
1389+
using (var model = connection.CreateModel())
1390+
{
1391+
model.ExchangeDeclare(re.Name, "topic", false, false);
1392+
}
1393+
}
1394+
};
1395+
var latch = new ManualResetEventSlim(false);
1396+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1397+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1398+
IModel ch = conn.CreateModel();
1399+
1400+
var exchangeToRecoverWithException = "recovery.exception.exchange";
1401+
var exchangeToRecoverSuccessfully = "successfully.recovered.exchange";
1402+
ch.ExchangeDeclare(exchangeToRecoverWithException, "direct", false, false);
1403+
ch.ExchangeDeclare(exchangeToRecoverSuccessfully, "direct", false, false);
1404+
1405+
Model.ExchangeDelete(exchangeToRecoverSuccessfully);
1406+
Model.ExchangeDelete(exchangeToRecoverWithException);
1407+
Model.ExchangeDeclare(exchangeToRecoverWithException, "topic", false, false);
1408+
1409+
CloseAndWaitForRecovery(conn);
1410+
Wait(latch);
1411+
1412+
Assert.IsTrue(ch.IsOpen);
1413+
AssertExchangeRecovery(ch, exchangeToRecoverSuccessfully);
1414+
AssertExchangeRecovery(ch, exchangeToRecoverWithException);
1415+
1416+
//Cleanup
1417+
Model.ExchangeDelete(exchangeToRecoverWithException);
1418+
}
1419+
1420+
[Test]
1421+
public void TestTopologyRecoveryBindingExceptionHandler()
1422+
{
1423+
var exchange = "topology.recovery.exchange";
1424+
var queueWithExceptionBinding = "recovery.exception.queue";
1425+
var bindingToRecoverWithException = "recovery.exception.binding";
1426+
1427+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1428+
{
1429+
BindingRecoveryExceptionCondition = (b, ex) =>
1430+
{
1431+
return b.RoutingKey.Contains("exception")
1432+
&& ex is OperationInterruptedException operationInterruptedException
1433+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.NotFound;
1434+
},
1435+
BindingRecoveryExceptionHandler = (b, ex, connection) =>
1436+
{
1437+
using (var model = connection.CreateModel())
1438+
{
1439+
model.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
1440+
model.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
1441+
}
1442+
}
1443+
};
1444+
var latch = new ManualResetEventSlim(false);
1445+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1446+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1447+
IModel ch = conn.CreateModel();
1448+
1449+
var queueWithRecoveredBinding = "successfully.recovered.queue";
1450+
var bindingToRecoverSuccessfully = "successfully.recovered.binding";
1451+
1452+
Model.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
1453+
1454+
ch.ExchangeDeclare(exchange, "direct");
1455+
ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null);
1456+
ch.QueueBind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully);
1457+
ch.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
1458+
ch.QueuePurge(queueWithRecoveredBinding);
1459+
ch.QueuePurge(queueWithExceptionBinding);
1460+
1461+
Model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully);
1462+
Model.QueueUnbind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
1463+
Model.QueueDelete(queueWithExceptionBinding);
1464+
1465+
CloseAndWaitForRecovery(conn);
1466+
Wait(latch);
1467+
1468+
Assert.IsTrue(ch.IsOpen);
1469+
Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully));
1470+
Assert.IsFalse(SendAndConsumeMessage(queueWithExceptionBinding, exchange, bindingToRecoverWithException));
1471+
}
1472+
1473+
[Test]
1474+
public void TestTopologyRecoveryConsumerExceptionHandler()
1475+
{
1476+
var queueWithExceptionConsumer = "recovery.exception.queue";
1477+
1478+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1479+
{
1480+
ConsumerRecoveryExceptionCondition = (c, ex) =>
1481+
{
1482+
return c.ConsumerTag.Contains("exception")
1483+
&& ex is OperationInterruptedException operationInterruptedException
1484+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.NotFound;
1485+
},
1486+
ConsumerRecoveryExceptionHandler = (c, ex, connection) =>
1487+
{
1488+
using (var model = connection.CreateModel())
1489+
{
1490+
model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
1491+
model.BasicConsume(queueWithExceptionConsumer, true, c.ConsumerTag, c.Consumer);
1492+
}
1493+
}
1494+
};
1495+
var latch = new ManualResetEventSlim(false);
1496+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1497+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1498+
IModel ch = conn.CreateModel();
1499+
ch.ConfirmSelect();
1500+
1501+
Model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
1502+
Model.QueuePurge(queueWithExceptionConsumer);
1503+
1504+
var recoverLatch = new ManualResetEventSlim(false);
1505+
var consumerToRecover = new EventingBasicConsumer(ch);
1506+
consumerToRecover.Received += (source, ea) => recoverLatch.Set();
1507+
ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover);
1508+
1509+
Model.QueueDelete(queueWithExceptionConsumer);
1510+
1511+
CloseAndWaitForRecovery(conn);
1512+
Wait(latch);
1513+
1514+
Assert.IsTrue(ch.IsOpen);
1515+
1516+
ch.BasicPublish("", queueWithExceptionConsumer, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message"));
1517+
1518+
Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5)));
1519+
1520+
try
1521+
{
1522+
ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover);
1523+
Assert.Fail("Expected an exception");
1524+
}
1525+
catch (OperationInterruptedException e)
1526+
{
1527+
AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag
1528+
}
1529+
}
1530+
13281531
internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey)
13291532
{
13301533
using (var ch = Conn.CreateModel())
@@ -1362,15 +1565,15 @@ internal void AssertQueueRecovery(IModel m, string q)
13621565
AssertQueueRecovery(m, q, true);
13631566
}
13641567

1365-
internal void AssertQueueRecovery(IModel m, string q, bool exclusive)
1568+
internal void AssertQueueRecovery(IModel m, string q, bool exclusive, IDictionary<string, object> arguments = null)
13661569
{
13671570
m.ConfirmSelect();
13681571
m.QueueDeclarePassive(q);
1369-
QueueDeclareOk ok1 = m.QueueDeclare(q, false, exclusive, false, null);
1572+
QueueDeclareOk ok1 = m.QueueDeclare(q, false, exclusive, false, arguments);
13701573
Assert.AreEqual(ok1.MessageCount, 0);
13711574
m.BasicPublish("", q, null, _messageBody);
13721575
Assert.IsTrue(WaitForConfirms(m));
1373-
QueueDeclareOk ok2 = m.QueueDeclare(q, false, exclusive, false, null);
1576+
QueueDeclareOk ok2 = m.QueueDeclare(q, false, exclusive, false, arguments);
13741577
Assert.AreEqual(ok2.MessageCount, 1);
13751578
}
13761579

0 commit comments

Comments
 (0)