Skip to content

Commit 6ddce25

Browse files
committed
Add test to make sure recovery occurs even if management is closed
1 parent e8eb253 commit 6ddce25

File tree

1 file changed

+41
-8
lines changed

1 file changed

+41
-8
lines changed

src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.rabbitmq.client.amqp.Management.ExchangeType.DIRECT;
2121
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
2222
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
23+
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
2324
import static java.time.Duration.ofMillis;
2425
import static java.util.stream.IntStream.range;
2526
import static org.assertj.core.api.Assertions.assertThat;
@@ -172,7 +173,7 @@ void queueShouldNotBeRecoveredWhenNoTopologyRecovery() {
172173

173174
@ParameterizedTest
174175
@ValueSource(booleans = {true, false})
175-
void resourceListenersShouldBeCalled(boolean isolateResources) throws Exception {
176+
void resourceListenersShouldBeCalled(boolean isolateResources) {
176177
List<String> events = new CopyOnWriteArrayList<>();
177178
try (Connection connection =
178179
connection(
@@ -222,7 +223,7 @@ void resourceListenersShouldBeCalled(boolean isolateResources) throws Exception
222223
"connection OPEN"
223224
};
224225

225-
TestUtils.waitAtMost(() -> events.size() == expectedStates.length);
226+
waitAtMost(() -> events.size() == expectedStates.length);
226227
assertThat(events).containsExactly(expectedStates);
227228
}
228229
}
@@ -360,7 +361,7 @@ void deletedQueueBindingIsNotRecovered(boolean isolateResources) {
360361

361362
@ParameterizedTest
362363
@ValueSource(booleans = {true, false})
363-
void deletedEchangeBindingIsNotRecovered(boolean isolateResources) {
364+
void deletedExchangeBindingIsNotRecovered(boolean isolateResources) {
364365
String e1 = exchange();
365366
String e2 = exchange();
366367
String q = queue();
@@ -454,11 +455,11 @@ void closedConsumerIsNotRecovered(boolean isolateResources) {
454455
connection.management().queue(q).declare();
455456
Consumer consumer =
456457
connection.consumerBuilder().queue(q).messageHandler((ctx, m) -> {}).build();
457-
TestUtils.waitAtMost(() -> connection.management().queueInfo(q).consumerCount() == 1);
458+
waitAtMost(() -> connection.management().queueInfo(q).consumerCount() == 1);
458459
consumer.close();
459460
closeConnectionAndWaitForRecovery();
460461
assertThat(connectionAttemptCount).hasValue(2);
461-
TestUtils.waitAtMost(() -> connection.management().queueInfo(q).consumerCount() == 0);
462+
waitAtMost(() -> connection.management().queueInfo(q).consumerCount() == 0);
462463
} finally {
463464
connection.management().queueDelete(q);
464465
connection.close();
@@ -591,15 +592,15 @@ void disposeStaleMessageShouldBeSilent(boolean isolateResources) throws Exceptio
591592
messageContexts.poll(10, TimeUnit.SECONDS).accept();
592593
messageContexts.poll(10, TimeUnit.SECONDS).accept();
593594

594-
TestUtils.waitAtMost(() -> connection.management().queueInfo(q).messageCount() == 0);
595+
waitAtMost(() -> connection.management().queueInfo(q).messageCount() == 0);
595596
} finally {
596597
connection.management().queueDelete(q);
597598
}
598599
}
599600

600601
@ParameterizedTest
601602
@ValueSource(booleans = {true, false})
602-
void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) throws Exception {
603+
void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) {
603604
try (Connection connection = connection(isolateResources)) {
604605
Management.QueueInfo queueInfo = connection.management().queue().exclusive(true).declare();
605606

@@ -624,7 +625,39 @@ void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) throw
624625

625626
publisher.publish(publisher.message(), ctx -> {});
626627
assertThat(consumeSync).completes();
627-
TestUtils.waitAtMost(() -> connection.management().queueInfo(queueName).messageCount() == 0);
628+
waitAtMost(() -> connection.management().queueInfo(queueName).messageCount() == 0);
629+
}
630+
}
631+
632+
@Test
633+
void shouldRecoverEvenIfManagementIsClosed() {
634+
try (Connection connection = connection()) {
635+
Management management = connection.management();
636+
Management.QueueInfo queueInfo = management.queue().exclusive(true).declare();
637+
Publisher publisher = connection.publisherBuilder().queue(queueInfo.name()).build();
638+
publisher.publish(publisher.message(), ctx -> {});
639+
TestUtils.Sync consumeSync = TestUtils.sync();
640+
connection
641+
.consumerBuilder()
642+
.queue(queueInfo.name())
643+
.messageHandler(
644+
(ctx, message) -> {
645+
ctx.accept();
646+
consumeSync.down();
647+
})
648+
.build();
649+
650+
assertThat(consumeSync).completes();
651+
waitAtMost(() -> management.queueInfo(queueInfo.name()).messageCount() == 0);
652+
management.close();
653+
654+
consumeSync.reset();
655+
656+
closeConnectionAndWaitForRecovery();
657+
publisher.publish(publisher.message(), ctx -> {});
658+
assertThat(consumeSync).completes();
659+
System.out.println("done");
660+
management.queueInfo(queueInfo.name());
628661
}
629662
}
630663

0 commit comments

Comments
 (0)