Skip to content

Commit 18d4b7c

Browse files
committed
Recover auto-delete queues
Fixes #240
1 parent cbfa913 commit 18d4b7c

File tree

2 files changed

+77
-26
lines changed

2 files changed

+77
-26
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/EntityRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void recoverExchange(RecordingTopologyListener.ExchangeSpec exchange) {
103103
}
104104

105105
private void recoverQueue(RecordingTopologyListener.QueueSpec queue) {
106-
if (queue.exclusive()) {
106+
if (queue.exclusive() || queue.autoDelete()) {
107107
LOGGER.debug("Recovering queue {}", queue.name());
108108
try {
109109
Management.QueueSpecification spec =

src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.rabbitmq.client.amqp.Consumer;
3535
import com.rabbitmq.client.amqp.Environment;
3636
import com.rabbitmq.client.amqp.Management;
37+
import com.rabbitmq.client.amqp.Management.QueueSpecification;
3738
import com.rabbitmq.client.amqp.Publisher;
3839
import com.rabbitmq.client.amqp.Resource;
3940
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
@@ -622,32 +623,15 @@ void disposeStaleMessageShouldBeSilent(boolean isolateResources) throws Exceptio
622623
@ParameterizedTest
623624
@ValueSource(booleans = {true, false})
624625
void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) {
625-
try (Connection connection = connection(isolateResources)) {
626-
Management.QueueInfo queueInfo = connection.management().queue().exclusive(true).declare();
627-
628-
String queueName = queueInfo.name();
629-
630-
Sync consumeSync = TestUtils.sync();
631-
Publisher publisher = connection.publisherBuilder().queue(queueName).build();
632-
connection
633-
.consumerBuilder()
634-
.queue(queueName)
635-
.messageHandler(
636-
(ctx, message) -> {
637-
ctx.accept();
638-
consumeSync.down();
639-
})
640-
.build();
641-
642-
publisher.publish(publisher.message(), ctx -> {});
643-
assertThat(consumeSync).completes();
644-
645-
closeConnectionAndWaitForRecovery();
626+
this.queueIsRecovered(isolateResources, s -> s.autoDelete(true));
627+
this.queueIsRecoveredNoConsumerBeforeRecovery(isolateResources, s -> s.autoDelete(true));
628+
}
646629

647-
publisher.publish(publisher.message(), ctx -> {});
648-
assertThat(consumeSync).completes();
649-
waitAtMost(() -> connection.management().queueInfo(queueName).messageCount() == 0);
650-
}
630+
@ParameterizedTest
631+
@ValueSource(booleans = {true, false})
632+
void exclusiveClientNamedQueueShouldBeRecovered(boolean isolateResources) {
633+
this.queueIsRecovered(isolateResources, s -> s.exclusive(true));
634+
this.queueIsRecoveredNoConsumerBeforeRecovery(isolateResources, s -> s.exclusive(true));
651635
}
652636

653637
@Test
@@ -766,4 +750,71 @@ void closeConnectionAndWaitForRecovery() {
766750
assertThat(this.recoveredSync).completes();
767751
this.recoveredSync.reset();
768752
}
753+
754+
private void queueIsRecovered(
755+
boolean isolateResources, java.util.function.Consumer<QueueSpecification> specCallback) {
756+
try (Connection connection = connection(isolateResources)) {
757+
QueueSpecification spec = connection.management().queue();
758+
specCallback.accept(spec);
759+
Management.QueueInfo queueInfo = spec.declare();
760+
761+
String queueName = queueInfo.name();
762+
763+
Sync consumeSync = TestUtils.sync();
764+
Publisher publisher = connection.publisherBuilder().queue(queueName).build();
765+
connection
766+
.consumerBuilder()
767+
.queue(queueName)
768+
.messageHandler(
769+
(ctx, message) -> {
770+
ctx.accept();
771+
consumeSync.down();
772+
})
773+
.build();
774+
775+
publisher.publish(publisher.message(), ctx -> {});
776+
assertThat(consumeSync).completes();
777+
778+
closeConnectionAndWaitForRecovery();
779+
780+
consumeSync.reset();
781+
publisher.publish(publisher.message(), ctx -> {});
782+
assertThat(consumeSync).completes();
783+
waitAtMost(() -> connection.management().queueInfo(queueName).messageCount() == 0);
784+
}
785+
}
786+
787+
private void queueIsRecoveredNoConsumerBeforeRecovery(
788+
boolean isolateResources, java.util.function.Consumer<QueueSpecification> specCallback) {
789+
try (Connection connection = connection(isolateResources)) {
790+
QueueSpecification spec = connection.management().queue();
791+
specCallback.accept(spec);
792+
Management.QueueInfo queueInfo = spec.declare();
793+
794+
String queueName = queueInfo.name();
795+
796+
Sync sync = TestUtils.sync();
797+
Publisher publisher = connection.publisherBuilder().queue(queueName).build();
798+
799+
publisher.publish(publisher.message(), ctx -> sync.down());
800+
assertThat(sync).completes();
801+
802+
closeConnectionAndWaitForRecovery();
803+
804+
sync.reset();
805+
connection
806+
.consumerBuilder()
807+
.queue(queueName)
808+
.messageHandler(
809+
(ctx, message) -> {
810+
ctx.accept();
811+
sync.down();
812+
})
813+
.build();
814+
815+
publisher.publish(publisher.message(), ctx -> {});
816+
assertThat(sync).completes();
817+
waitAtMost(() -> connection.management().queueInfo(queueName).messageCount() == 0);
818+
}
819+
}
769820
}

0 commit comments

Comments
 (0)