Skip to content

Commit 4f2d3dc

Browse files
committed
Add internal listener to collect consumer queues in tests
This is more reliable than listing queues before and while the test is running, as unrelated queues can go away between calls.
1 parent 7227675 commit 4f2d3dc

File tree

4 files changed

+26
-20
lines changed

4 files changed

+26
-20
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public class MulticastParams {
140140

141141
private PrintStream out = System.out;
142142
private boolean netty = false;
143+
private java.util.function.Consumer<List<String>> consumerConfiguredQueueListener = qs -> {};
143144

144145
public void setExchangeType(String exchangeType) {
145146
this.exchangeType = exchangeType;
@@ -639,6 +640,8 @@ public Consumer createConsumer(
639640
Recovery.RecoveryProcess recoveryProcess =
640641
setupRecoveryProcess(connection, topologyHandlerResult.topologyRecording);
641642

643+
this.consumerConfiguredQueueListener.accept(topologyHandlerResult.configuredQueues);
644+
642645
Consumer consumer =
643646
new Consumer(
644647
new ConsumerParameters()
@@ -782,6 +785,11 @@ boolean netty() {
782785
return this.netty;
783786
}
784787

788+
public void setConsumerConfiguredQueueListener(
789+
java.util.function.Consumer<List<String>> listener) {
790+
this.consumerConfiguredQueueListener = listener;
791+
}
792+
785793
/**
786794
* Contract to handle the creation and configuration of resources. E.g. creation of queues,
787795
* binding exchange to queues.

src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import static com.rabbitmq.perf.TestUtils.randomName;
1919
import static com.rabbitmq.perf.TestUtils.threadFactory;
2020
import static com.rabbitmq.perf.TestUtils.waitAtMost;
21+
import static com.rabbitmq.perf.it.Host.listQueues;
2122
import static com.rabbitmq.perf.it.Utils.latchCompletionHandler;
2223
import static java.lang.String.format;
2324
import static java.util.Arrays.asList;
25+
import static java.util.Collections.singletonList;
2426
import static java.util.stream.Collectors.toList;
2527
import static java.util.stream.IntStream.range;
2628
import static org.assertj.core.api.Assertions.assertThat;
@@ -38,6 +40,8 @@
3840
import java.util.Arrays;
3941
import java.util.Collections;
4042
import java.util.List;
43+
import java.util.Queue;
44+
import java.util.concurrent.ConcurrentLinkedQueue;
4145
import java.util.concurrent.CountDownLatch;
4246
import java.util.concurrent.ExecutorService;
4347
import java.util.concurrent.Executors;
@@ -66,11 +70,12 @@ public class ConnectionRecoveryIT {
6670

6771
static final String URI = "amqp://localhost";
6872

69-
static final List<String> URIS = Collections.singletonList(URI);
73+
static final List<String> URIS = singletonList(URI);
7074

7175
static final int RATE = 11;
7276

7377
MulticastParams params;
78+
Queue<String> consumerQueues = new ConcurrentLinkedQueue<>();
7479

7580
ExecutorService executorService;
7681

@@ -219,6 +224,7 @@ public void init(TestInfo info) {
219224
params.setProducerCount(1);
220225
params.setConsumerCount(1);
221226
params.setProducerRateLimit(RATE);
227+
params.setConsumerConfiguredQueueListener(consumerQueues::addAll);
222228
cf = new ConnectionFactory();
223229
cf.setNetworkRecoveryInterval(2000);
224230
cf.setTopologyRecoveryEnabled(false);
@@ -410,12 +416,11 @@ public void shouldRecoverWithPreDeclared(boolean polling, TestInfo info) throws
410416

411417
@Test
412418
void durableServerNamedQueueShouldBeReusedIfStillThere(TestInfo info) throws Exception {
413-
params.setFlags(Arrays.asList("persistent"));
419+
params.setFlags(singletonList("persistent"));
414420
params.setAutoDelete(false);
415421
params.setExclusive(false);
416422
params.setQueueNames(Collections.emptyList());
417423

418-
List<String> queuesBeforeTest = Host.listServerNamedQueues();
419424
MulticastSet.CompletionHandler completionHandler = latchCompletionHandler(1, info);
420425
String createdQueue = null;
421426
try {
@@ -424,15 +429,12 @@ void durableServerNamedQueueShouldBeReusedIfStillThere(TestInfo info) throws Exc
424429
new MulticastSet(performanceMetrics, cf, params, "", URIS, completionHandler);
425430
run(set);
426431
waitAtMost(10, () -> msgConsumed.get() >= 3 * producerConsumerCount * RATE);
427-
List<String> queuesDuringTest = Host.listServerNamedQueues();
428-
assertThat(queuesDuringTest).hasSize(queuesBeforeTest.size() + 1);
429-
queuesDuringTest.removeAll(queuesBeforeTest);
430-
assertThat(queuesDuringTest).hasSize(1);
431-
createdQueue = queuesDuringTest.get(0);
432+
assertThat(consumerQueues).hasSize(1);
433+
createdQueue = consumerQueues.poll();
432434
long messageCountBeforeClosing = msgConsumed.get();
433435
closeAllConnections();
434436
waitAtMost(10, () -> msgConsumed.get() >= 2 * messageCountBeforeClosing);
435-
assertThat(Host.listServerNamedQueues()).hasSize(queuesBeforeTest.size() + 1);
437+
assertThat(listQueues()).contains(createdQueue);
436438
completionHandler.countDown("stopped in test");
437439
waitAtMost(10, () -> testIsDone.get());
438440
} finally {
@@ -454,7 +456,7 @@ void recreateBindingEvenOnPreDeclaredDurableQueue(TestInfo info) throws Exceptio
454456
ch.exchangeDelete("direct");
455457
}
456458

457-
params.setQueueNames(Collections.singletonList(queue));
459+
params.setQueueNames(singletonList(queue));
458460
params.setPredeclared(true);
459461

460462
int producerConsumerCount = params.getProducerCount();

src/test/java/com/rabbitmq/perf/it/Host.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,6 @@ public static List<ConnectionInfo> listConnections() throws IOException {
145145
return result;
146146
}
147147

148-
public static List<String> listServerNamedQueues() throws IOException {
149-
return listQueues().stream().filter(q -> q.startsWith("amq.gen")).collect(Collectors.toList());
150-
}
151-
152148
public static List<String> listQueues() throws IOException {
153149
// "messages" column will help ignore the header line
154150
String output = capture(rabbitmqctl("list_queues -q name messages").getInputStream());

src/test/java/com/rabbitmq/perf/it/MiscellaneousIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.io.PrintStream;
3636
import java.time.Duration;
3737
import java.util.List;
38+
import java.util.Queue;
39+
import java.util.concurrent.ConcurrentLinkedQueue;
3840
import java.util.concurrent.CountDownLatch;
3941
import java.util.concurrent.ExecutorService;
4042
import java.util.concurrent.Executors;
@@ -148,23 +150,21 @@ void consumerShouldAckWhenRateLimitationIsEnabled(TestInfo info) throws Exceptio
148150

149151
@Test
150152
void consumerShouldRecoverWhenServerNamedQueueIsDeleted(TestInfo info) throws Exception {
153+
Queue<String> queues = new ConcurrentLinkedQueue<>();
151154
int rate = 100;
152155
params.setProducerCount(1);
153156
params.setConsumerCount(1);
154157
params.setProducerRateLimit(rate);
155-
156-
List<String> queuesBeforeTest = Host.listQueues();
158+
params.setConsumerConfiguredQueueListener(queues::addAll);
157159

158160
MulticastSet.CompletionHandler completionHandler = latchCompletionHandler(1, info);
159161
MulticastSet set =
160162
new MulticastSet(performanceMetrics, cf, params, "", URIS, completionHandler);
161163
run(set);
162164

163165
waitAtMost(10, () -> msgConsumed.get() >= 3 * rate);
164-
List<String> queuesDuringTest = Host.listQueues();
165-
queuesDuringTest.removeAll(queuesBeforeTest);
166-
assertThat(queuesDuringTest).hasSize(1);
167-
String queue = queuesDuringTest.get(0);
166+
assertThat(queues).hasSize(1);
167+
String queue = queues.poll();
168168

169169
ConnectionFactory connectionFactory = new ConnectionFactory();
170170
long messageConsumedBeforeDeletion;

0 commit comments

Comments
 (0)