Skip to content

Commit 599f859

Browse files
committed
Added tests for topology recovery filter
1 parent d1f35ca commit 599f859

File tree

3 files changed

+282
-12
lines changed

3 files changed

+282
-12
lines changed

projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,28 @@ namespace RabbitMQ.Client
88
/// </summary>
99
public class TopologyRecoveryFilter
1010
{
11-
private Func<IRecordedExchange, bool> _exchangeFilter = exchange => true;
12-
private Func<IRecordedQueue, bool> _queueFilter = queue => true;
13-
private Func<IRecordedBinding, bool> _bindingFilter = binding => true;
14-
private Func<IRecordedConsumer, bool> _consumerFilter = consumer => true;
11+
private static readonly Func<IRecordedExchange, bool> s_defaultExchangeFilter = exchange => true;
12+
private static readonly Func<IRecordedQueue, bool> s_defaultQueueFilter = queue => true;
13+
private static readonly Func<IRecordedBinding, bool> s_defaultBindingFilter = binding => true;
14+
private static readonly Func<IRecordedConsumer, bool> s_defaultConsumerFilter = consumer => true;
15+
16+
private Func<IRecordedExchange, bool> _exchangeFilter;
17+
private Func<IRecordedQueue, bool> _queueFilter;
18+
private Func<IRecordedBinding, bool> _bindingFilter;
19+
private Func<IRecordedConsumer, bool> _consumerFilter;
1520

1621
/// <summary>
1722
/// Decides whether an exchange is recovered or not.
1823
/// </summary>
1924
public Func<IRecordedExchange, bool> ExchangeFilter
2025
{
21-
get => _exchangeFilter;
26+
get => _exchangeFilter ?? s_defaultExchangeFilter;
2227

23-
init
28+
set
2429
{
30+
if (_exchangeFilter != null)
31+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeFilter)} after it has been initialized.");
32+
2533
_exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter));
2634
}
2735
}
@@ -31,10 +39,13 @@ public Func<IRecordedExchange, bool> ExchangeFilter
3139
/// </summary>
3240
public Func<IRecordedQueue, bool> QueueFilter
3341
{
34-
get => _queueFilter;
42+
get => _queueFilter ?? s_defaultQueueFilter;
3543

36-
init
44+
set
3745
{
46+
if (_queueFilter != null)
47+
throw new InvalidOperationException($"Cannot modify {nameof(QueueFilter)} after it has been initialized.");
48+
3849
_queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter));
3950
}
4051
}
@@ -44,10 +55,13 @@ public Func<IRecordedQueue, bool> QueueFilter
4455
/// </summary>
4556
public Func<IRecordedBinding, bool> BindingFilter
4657
{
47-
get => _bindingFilter;
58+
get => _bindingFilter ?? s_defaultBindingFilter;
4859

49-
init
60+
set
5061
{
62+
if (_bindingFilter != null)
63+
throw new InvalidOperationException($"Cannot modify {nameof(BindingFilter)} after it has been initialized.");
64+
5165
_bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter));
5266
}
5367
}
@@ -57,10 +71,13 @@ public Func<IRecordedBinding, bool> BindingFilter
5771
/// </summary>
5872
public Func<IRecordedConsumer, bool> ConsumerFilter
5973
{
60-
get => _consumerFilter;
74+
get => _consumerFilter ?? s_defaultConsumerFilter;
6175

62-
init
76+
set
6377
{
78+
if (_consumerFilter != null)
79+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerFilter)} after it has been initialized.");
80+
6481
_consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter));
6582
}
6683
}

projects/Unit/Fixtures.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco
163163
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
164164
}
165165

166+
internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryFilter(TopologyRecoveryFilter filter)
167+
{
168+
var cf = new ConnectionFactory
169+
{
170+
AutomaticRecoveryEnabled = true,
171+
TopologyRecoveryEnabled = true,
172+
TopologyRecoveryFilter = filter
173+
};
174+
175+
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
176+
}
177+
166178
internal IConnection CreateConnectionWithContinuationTimeout(bool automaticRecoveryEnabled, TimeSpan continuationTimeout)
167179
{
168180
var cf = new ConnectionFactory

projects/Unit/TestConnectionRecovery.cs

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Text;
3435
using System.Threading;
3536
using System.Threading.Tasks;
3637
using RabbitMQ.Client.Events;
@@ -1053,6 +1054,246 @@ public void TestUnblockedListenersRecovery()
10531054
Wait(latch);
10541055
}
10551056

1057+
[Fact]
1058+
public void TestTopologyRecoveryQueueFilter()
1059+
{
1060+
var filter = new TopologyRecoveryFilter
1061+
{
1062+
QueueFilter = queue => !queue.Name.Contains("filtered")
1063+
};
1064+
var latch = new ManualResetEventSlim(false);
1065+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1066+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1067+
IModel ch = conn.CreateModel();
1068+
1069+
var queueToRecover = "recovered.queue";
1070+
var queueToIgnore = "filtered.queue";
1071+
ch.QueueDeclare(queueToRecover, false, false, false, null);
1072+
ch.QueueDeclare(queueToIgnore, false, false, false, null);
1073+
1074+
1075+
_model.QueueDelete(queueToRecover);
1076+
_model.QueueDelete(queueToIgnore);
1077+
1078+
CloseAndWaitForRecovery(conn);
1079+
Wait(latch);
1080+
1081+
Assert.True(ch.IsOpen);
1082+
AssertQueueRecovery(ch, queueToRecover, false);
1083+
1084+
try
1085+
{
1086+
ch.QueueDeclarePassive(queueToIgnore);
1087+
Assert.Fail("Expected an exception");
1088+
}
1089+
catch (OperationInterruptedException e)
1090+
{
1091+
AssertShutdownError(e.ShutdownReason, 404);
1092+
}
1093+
}
1094+
1095+
[Fact]
1096+
public void TestTopologyRecoveryExchangeFilter()
1097+
{
1098+
var filter = new TopologyRecoveryFilter
1099+
{
1100+
ExchangeFilter = exchange => exchange.Type == "topic" && !exchange.Name.Contains("filtered")
1101+
};
1102+
var latch = new ManualResetEventSlim(false);
1103+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1104+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1105+
IModel ch = conn.CreateModel();
1106+
1107+
var exchangeToRecover = "recovered.exchange";
1108+
var exchangeToIgnore = "filtered.exchange";
1109+
ch.ExchangeDeclare(exchangeToRecover, "topic", false, true);
1110+
ch.ExchangeDeclare(exchangeToIgnore, "direct", false, true);
1111+
1112+
_model.ExchangeDelete(exchangeToRecover);
1113+
_model.ExchangeDelete(exchangeToIgnore);
1114+
1115+
CloseAndWaitForRecovery(conn);
1116+
Wait(latch);
1117+
1118+
Assert.True(ch.IsOpen);
1119+
AssertExchangeRecovery(ch, exchangeToRecover);
1120+
1121+
try
1122+
{
1123+
ch.ExchangeDeclarePassive(exchangeToIgnore);
1124+
Assert.Fail("Expected an exception");
1125+
}
1126+
catch (OperationInterruptedException e)
1127+
{
1128+
AssertShutdownError(e.ShutdownReason, 404);
1129+
}
1130+
}
1131+
1132+
[Fact]
1133+
public void TestTopologyRecoveryBindingFilter()
1134+
{
1135+
var filter = new TopologyRecoveryFilter
1136+
{
1137+
BindingFilter = binding => !binding.RoutingKey.Contains("filtered")
1138+
};
1139+
var latch = new ManualResetEventSlim(false);
1140+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1141+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1142+
IModel ch = conn.CreateModel();
1143+
1144+
var exchange = "topology.recovery.exchange";
1145+
var queueWithRecoveredBinding = "topology.recovery.queue.1";
1146+
var queueWithIgnoredBinding = "topology.recovery.queue.2";
1147+
var bindingToRecover = "recovered.binding";
1148+
var bindingToIgnore = "filtered.binding";
1149+
1150+
ch.ExchangeDeclare(exchange, "direct");
1151+
ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null);
1152+
ch.QueueDeclare(queueWithIgnoredBinding, false, false, false, null);
1153+
ch.QueueBind(queueWithRecoveredBinding, exchange, bindingToRecover);
1154+
ch.QueueBind(queueWithIgnoredBinding, exchange, bindingToIgnore);
1155+
ch.QueuePurge(queueWithRecoveredBinding);
1156+
ch.QueuePurge(queueWithIgnoredBinding);
1157+
1158+
_model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecover);
1159+
_model.QueueUnbind(queueWithIgnoredBinding, exchange, bindingToIgnore);
1160+
1161+
CloseAndWaitForRecovery(conn);
1162+
Wait(latch);
1163+
1164+
Assert.True(ch.IsOpen);
1165+
Assert.True(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecover));
1166+
Assert.False(SendAndConsumeMessage(queueWithIgnoredBinding, exchange, bindingToIgnore));
1167+
}
1168+
1169+
[Fact]
1170+
public void TestTopologyRecoveryConsumerFilter()
1171+
{
1172+
var filter = new TopologyRecoveryFilter
1173+
{
1174+
ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered")
1175+
};
1176+
var latch = new ManualResetEventSlim(false);
1177+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1178+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1179+
IModel ch = conn.CreateModel();
1180+
ch.ConfirmSelect();
1181+
1182+
var exchange = "topology.recovery.exchange";
1183+
var queueWithRecoveredConsumer = "topology.recovery.queue.1";
1184+
var queueWithIgnoredConsumer = "topology.recovery.queue.2";
1185+
var binding1 = "recovered.binding";
1186+
var binding2 = "filtered.binding";
1187+
1188+
ch.ExchangeDeclare(exchange, "direct");
1189+
ch.QueueDeclare(queueWithRecoveredConsumer, false, false, false, null);
1190+
ch.QueueDeclare(queueWithIgnoredConsumer, false, false, false, null);
1191+
ch.QueueBind(queueWithRecoveredConsumer, exchange, binding1);
1192+
ch.QueueBind(queueWithIgnoredConsumer, exchange, binding2);
1193+
ch.QueuePurge(queueWithRecoveredConsumer);
1194+
ch.QueuePurge(queueWithIgnoredConsumer);
1195+
1196+
var recoverLatch = new ManualResetEventSlim(false);
1197+
var consumerToRecover = new EventingBasicConsumer(ch);
1198+
consumerToRecover.Received += (source, ea) => recoverLatch.Set();
1199+
ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover);
1200+
1201+
var ignoredLatch = new ManualResetEventSlim(false);
1202+
var consumerToIgnore = new EventingBasicConsumer(ch);
1203+
consumerToIgnore.Received += (source, ea) => ignoredLatch.Set();
1204+
ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore);
1205+
1206+
CloseAndWaitForRecovery(conn);
1207+
Wait(latch);
1208+
1209+
Assert.True(ch.IsOpen);
1210+
ch.BasicPublish(exchange, binding1, Encoding.UTF8.GetBytes("test message"));
1211+
ch.BasicPublish(exchange, binding2, Encoding.UTF8.GetBytes("test message"));
1212+
1213+
Assert.True(recoverLatch.Wait(TimeSpan.FromSeconds(5)));
1214+
Assert.False(ignoredLatch.Wait(TimeSpan.FromSeconds(5)));
1215+
1216+
ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore);
1217+
1218+
try
1219+
{
1220+
ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover);
1221+
Assert.Fail("Expected an exception");
1222+
}
1223+
catch (OperationInterruptedException e)
1224+
{
1225+
AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag
1226+
}
1227+
}
1228+
1229+
[Fact]
1230+
public void TestTopologyRecoveryDefaultFilterRecoversAllEntities()
1231+
{
1232+
var filter = new TopologyRecoveryFilter();
1233+
var latch = new ManualResetEventSlim(false);
1234+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1235+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1236+
IModel ch = conn.CreateModel();
1237+
1238+
var exchange = "topology.recovery.exchange";
1239+
var queue1 = "topology.recovery.queue.1";
1240+
var queue2 = "topology.recovery.queue.2";
1241+
var binding1 = "recovered.binding";
1242+
var binding2 = "filtered.binding";
1243+
1244+
ch.ExchangeDeclare(exchange, "direct");
1245+
ch.QueueDeclare(queue1, false, false, false, null);
1246+
ch.QueueDeclare(queue2, false, false, false, null);
1247+
ch.QueueBind(queue1, exchange, binding1);
1248+
ch.QueueBind(queue2, exchange, binding2);
1249+
ch.QueuePurge(queue1);
1250+
ch.QueuePurge(queue2);
1251+
1252+
var consumerLatch1 = new ManualResetEventSlim(false);
1253+
var consumer1 = new EventingBasicConsumer(ch);
1254+
consumer1.Received += (source, ea) => consumerLatch1.Set();
1255+
ch.BasicConsume(queue1, true, "recovered.consumer", consumer1);
1256+
1257+
var consumerLatch2 = new ManualResetEventSlim(false);
1258+
var consumer2 = new EventingBasicConsumer(ch);
1259+
consumer2.Received += (source, ea) => consumerLatch2.Set();
1260+
ch.BasicConsume(queue2, true, "filtered.consumer", consumer2);
1261+
1262+
_model.ExchangeDelete(exchange);
1263+
_model.QueueDelete(queue1);
1264+
_model.QueueDelete(queue2);
1265+
1266+
CloseAndWaitForRecovery(conn);
1267+
Wait(latch);
1268+
1269+
Assert.True(ch.IsOpen);
1270+
AssertExchangeRecovery(ch, exchange);
1271+
ch.QueueDeclarePassive(queue1);
1272+
ch.QueueDeclarePassive(queue2);
1273+
1274+
ch.BasicPublish(exchange, binding1, Encoding.UTF8.GetBytes("test message"));
1275+
ch.BasicPublish(exchange, binding2, Encoding.UTF8.GetBytes("test message"));
1276+
1277+
Assert.True(consumerLatch1.Wait(TimeSpan.FromSeconds(5)));
1278+
Assert.True(consumerLatch2.Wait(TimeSpan.FromSeconds(5)));
1279+
}
1280+
1281+
internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey)
1282+
{
1283+
using (var ch = _conn.CreateModel())
1284+
{
1285+
var latch = new ManualResetEventSlim(false);
1286+
1287+
var consumer = new AckingBasicConsumer(ch, 1, latch);
1288+
1289+
ch.BasicConsume(queue, true, consumer);
1290+
1291+
ch.BasicPublish(exchange, routingKey, Encoding.UTF8.GetBytes("test message"));
1292+
1293+
return latch.Wait(TimeSpan.FromSeconds(5));
1294+
}
1295+
}
1296+
10561297
internal void AssertExchangeRecovery(IChannel m, string x)
10571298
{
10581299
m.ConfirmSelect();

0 commit comments

Comments
 (0)