Skip to content

Commit bb2af63

Browse files
committed
Fix test flake
1 parent 27a5f01 commit bb2af63

File tree

2 files changed

+72
-39
lines changed

2 files changed

+72
-39
lines changed

src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java

Lines changed: 68 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
1616
package com.rabbitmq.perf.it;
1717

18+
import static com.rabbitmq.perf.TestUtils.randomName;
1819
import static com.rabbitmq.perf.TestUtils.threadFactory;
1920
import static com.rabbitmq.perf.TestUtils.waitAtMost;
2021
import static com.rabbitmq.perf.it.Utils.latchCompletionHandler;
21-
import static com.rabbitmq.perf.it.Utils.queueName;
2222
import static java.lang.String.format;
23-
import static java.lang.String.valueOf;
2423
import static java.util.Arrays.asList;
2524
import static java.util.stream.Collectors.toList;
2625
import static java.util.stream.IntStream.range;
@@ -42,6 +41,7 @@
4241
import java.util.concurrent.*;
4342
import java.util.concurrent.atomic.AtomicBoolean;
4443
import java.util.concurrent.atomic.AtomicLong;
44+
import java.util.function.BiConsumer;
4545
import java.util.function.Consumer;
4646
import java.util.stream.IntStream;
4747
import java.util.stream.Stream;
@@ -101,9 +101,9 @@ static Stream<Arguments> configurationArguments() {
101101
}
102102

103103
static Arguments[] blockingIoAndNetty(
104-
List<Consumer<MulticastParams>> multicastParamsConfigurers) {
104+
List<BiConsumer<MulticastParams, TestInfo>> multicastParamsConfigurers) {
105105
List<Arguments> arguments = new ArrayList<>();
106-
for (Consumer<MulticastParams> configurer : multicastParamsConfigurers) {
106+
for (BiConsumer<MulticastParams, TestInfo> configurer : multicastParamsConfigurers) {
107107
arguments.add(
108108
Arguments.of(
109109
configurer, namedConsumer("blocking IO", (Consumer<ConnectionFactory>) cf -> {})));
@@ -113,34 +113,38 @@ static Arguments[] blockingIoAndNetty(
113113
return arguments.toArray(new Arguments[0]);
114114
}
115115

116-
static List<Consumer<MulticastParams>> multicastParamsConfigurers() {
117-
List<Consumer<MulticastParams>> parameters = new ArrayList<>();
118-
for (Consumer<MulticastParams> queuesVariation : queuesVariations()) {
116+
static List<BiConsumer<MulticastParams, TestInfo>> multicastParamsConfigurers() {
117+
List<BiConsumer<MulticastParams, TestInfo>> parameters = new ArrayList<>();
118+
for (BiConsumer<MulticastParams, TestInfo> queuesVariation : queuesVariations()) {
119119
parameters.add(queuesVariation);
120120
parameters.add(
121121
namedConsumer(
122122
"polling - " + queuesVariation,
123-
params -> {
124-
queuesVariation.accept(params);
123+
(params, i) -> {
124+
queuesVariation.accept(params, i);
125125
params.setPolling(true);
126126
params.setPollingInterval(10);
127127
}));
128128
}
129129
return parameters;
130130
}
131131

132-
static List<Consumer<MulticastParams>> queuesVariations() {
132+
static List<BiConsumer<MulticastParams, TestInfo>> queuesVariations() {
133133
return asList(
134-
namedConsumer("one server-named queue", empty()),
134+
namedConsumer("one server-named queue", biConsumerAdapter(empty())),
135135
namedConsumer("several queues", severalQueues()),
136136
namedConsumer("queue sequence", queueSequence()),
137-
namedConsumer("one server-named queue, exclusive", exclusive()),
138-
namedConsumer("queue sequence, exclusive", queueSequence().andThen(exclusive())));
137+
namedConsumer("one server-named queue, exclusive", biConsumerAdapter(exclusive())),
138+
namedConsumer("queue sequence, exclusive", queueSequence().andThen(biExclusive())));
139+
}
140+
141+
private static <T, U> BiConsumer<T, U> biConsumerAdapter(Consumer<T> delegate) {
142+
return (t, u) -> delegate.accept(t);
139143
}
140144

141145
static Stream<Arguments> configurationArgumentsForSeveralUris() {
142146
return Stream.of(
143-
namedConsumer("one server-named queue", empty()),
147+
namedConsumer("one server-named queue", biConsumerAdapter(empty())),
144148
namedConsumer("several queues", severalQueues()),
145149
namedConsumer("queue sequence", queueSequence()))
146150
.map(Arguments::of);
@@ -150,23 +154,26 @@ static Consumer<MulticastParams> empty() {
150154
return p -> {};
151155
}
152156

153-
static Consumer<MulticastParams> severalQueues() {
154-
return p -> {
155-
String suffix = valueOf(System.currentTimeMillis());
156-
p.setQueueNames(
157-
range(1, 5).mapToObj(i -> format("perf-test-%s-%d", suffix, i)).collect(toList()));
157+
static BiConsumer<MulticastParams, TestInfo> severalQueues() {
158+
return (p, info) -> {
159+
String suffix = randomName(info);
160+
p.setQueueNames(range(1, 5).mapToObj(i -> format("%s-%d", suffix, i)).collect(toList()));
158161
};
159162
}
160163

161164
static Consumer<MulticastParams> exclusive() {
162165
return p -> p.setExclusive(true);
163166
}
164167

165-
static Consumer<MulticastParams> queueSequence() {
166-
return p -> {
168+
static BiConsumer<MulticastParams, TestInfo> biExclusive() {
169+
return (p, i) -> p.setExclusive(true);
170+
}
171+
172+
static BiConsumer<MulticastParams, TestInfo> queueSequence() {
173+
return (p, i) -> {
167174
p.setProducerCount(4);
168175
p.setConsumerCount(4);
169-
p.setQueuePattern("perf-test-sequence-" + System.currentTimeMillis() + "-%d");
176+
p.setQueuePattern(randomName(i) + "-%d");
170177
p.setQueueSequenceFrom(1);
171178
p.setQueueSequenceTo(4);
172179
};
@@ -187,6 +194,21 @@ public String toString() {
187194
};
188195
}
189196

197+
static <T, U> BiConsumer<T, U> namedConsumer(String name, BiConsumer<T, U> consumer) {
198+
return new BiConsumer<T, U>() {
199+
200+
@Override
201+
public void accept(T t, U u) {
202+
consumer.accept(t, u);
203+
}
204+
205+
@Override
206+
public String toString() {
207+
return name;
208+
}
209+
};
210+
}
211+
190212
@BeforeEach
191213
public void init(TestInfo info) {
192214
executorService = Executors.newCachedThreadPool(threadFactory(info));
@@ -216,10 +238,12 @@ public void tearDown() throws InterruptedException {
216238
@ParameterizedTest
217239
@MethodSource("configurationArguments")
218240
public void shouldStopWhenConnectionRecoveryIsOffAndConnectionsAreKilled(
219-
Consumer<MulticastParams> configurer, Consumer<ConnectionFactory> cfConfigurer, TestInfo info)
241+
BiConsumer<MulticastParams, TestInfo> configurer,
242+
Consumer<ConnectionFactory> cfConfigurer,
243+
TestInfo info)
220244
throws Exception {
221245
cf.setAutomaticRecoveryEnabled(false);
222-
configurer.accept(params);
246+
configurer.accept(params, info);
223247
cfConfigurer.accept(cf);
224248
int producerConsumerCount = params.getProducerCount();
225249
MulticastSet set =
@@ -234,12 +258,12 @@ public void shouldStopWhenConnectionRecoveryIsOffAndConnectionsAreKilled(
234258
@MethodSource("configurationArguments")
235259
public void
236260
shouldStopWhenConnectionRecoveryIsOffAndConnectionsAreKilledAndUsingPublishingInterval(
237-
Consumer<MulticastParams> configurer,
261+
BiConsumer<MulticastParams, TestInfo> configurer,
238262
Consumer<ConnectionFactory> cfConfigurer,
239263
TestInfo info)
240264
throws Exception {
241265
cf.setAutomaticRecoveryEnabled(false);
242-
configurer.accept(params);
266+
configurer.accept(params, info);
243267
cfConfigurer.accept(cf);
244268
params.setPublishingInterval(Duration.ofSeconds(1));
245269
int producerConsumerCount = params.getProducerCount();
@@ -255,9 +279,11 @@ public void shouldStopWhenConnectionRecoveryIsOffAndConnectionsAreKilled(
255279
@ParameterizedTest
256280
@MethodSource("configurationArguments")
257281
public void shouldRecoverWhenConnectionsAreKilled(
258-
Consumer<MulticastParams> configurer, Consumer<ConnectionFactory> cfConfigurer, TestInfo info)
282+
BiConsumer<MulticastParams, TestInfo> configurer,
283+
Consumer<ConnectionFactory> cfConfigurer,
284+
TestInfo info)
259285
throws Exception {
260-
configurer.accept(params);
286+
configurer.accept(params, info);
261287
cfConfigurer.accept(cf);
262288
int producerConsumerCount = params.getProducerCount();
263289
MulticastSet set =
@@ -273,10 +299,12 @@ public void shouldRecoverWhenConnectionsAreKilled(
273299
@ParameterizedTest
274300
@MethodSource("configurationArguments")
275301
public void shouldRecoverWhenConnectionsAreKilledAndUsingPublishingInterval(
276-
Consumer<MulticastParams> configurer, Consumer<ConnectionFactory> cfConfigurer, TestInfo info)
302+
BiConsumer<MulticastParams, TestInfo> configurer,
303+
Consumer<ConnectionFactory> cfConfigurer,
304+
TestInfo info)
277305
throws Exception {
278306
params.setPublishingInterval(Duration.ofSeconds(1));
279-
configurer.accept(params);
307+
configurer.accept(params, info);
280308
cfConfigurer.accept(cf);
281309
int producerConsumerCount = params.getProducerCount();
282310
MulticastSet set =
@@ -297,7 +325,8 @@ public void shouldRecoverWhenConnectionsAreKilledAndUsingPublishingInterval(
297325

298326
@Test
299327
public void shouldRecoverWithNetty(TestInfo info) throws Exception {
300-
params.setQueueNames(Arrays.asList("one", "two", "three"));
328+
String prefix = randomName(info);
329+
params.setQueueNames(Arrays.asList(prefix + "-one", prefix + "-two", prefix + "-three"));
301330
params.setProducerCount(10);
302331
params.setConsumerCount(10);
303332
cf.netty();
@@ -315,8 +344,8 @@ public void shouldRecoverWithNetty(TestInfo info) throws Exception {
315344
@ParameterizedTest
316345
@MethodSource("configurationArgumentsForSeveralUris")
317346
public void shouldRecoverWhenConnectionsAreKilledAndUsingSeveralUris(
318-
Consumer<MulticastParams> configurer, TestInfo info) throws Exception {
319-
configurer.accept(params);
347+
BiConsumer<MulticastParams, TestInfo> configurer, TestInfo info) throws Exception {
348+
configurer.accept(params, info);
320349
int producerConsumerCount = params.getProducerCount();
321350
MulticastSet set =
322351
new MulticastSet(
@@ -338,7 +367,7 @@ public void shouldRecoverWhenConnectionsAreKilledAndUsingSeveralUris(
338367
@ParameterizedTest
339368
public void shouldRecoverWithPreDeclared(boolean polling, TestInfo info) throws Exception {
340369
int queueCount = 5;
341-
String queuePattern = "perf-test-" + info.getTestMethod().get().getName() + "-%d";
370+
String queuePattern = randomName(info) + "-%d";
342371
ConnectionFactory connectionFactory = new ConnectionFactory();
343372
try (Connection c = connectionFactory.newConnection()) {
344373
Channel ch = c.createChannel();
@@ -383,7 +412,7 @@ void durableServerNamedQueueShouldBeReusedIfStillThere(TestInfo info) throws Exc
383412
params.setExclusive(false);
384413
params.setQueueNames(Collections.emptyList());
385414

386-
List<String> queuesBeforeTest = Host.listQueues();
415+
List<String> queuesBeforeTest = Host.listServerNamedQueues();
387416
MulticastSet.CompletionHandler completionHandler = latchCompletionHandler(1, info);
388417
String createdQueue = null;
389418
try {
@@ -392,15 +421,15 @@ void durableServerNamedQueueShouldBeReusedIfStillThere(TestInfo info) throws Exc
392421
new MulticastSet(performanceMetrics, cf, params, "", URIS, completionHandler);
393422
run(set);
394423
waitAtMost(10, () -> msgConsumed.get() >= 3 * producerConsumerCount * RATE);
395-
List<String> queuesDuringTest = Host.listQueues();
424+
List<String> queuesDuringTest = Host.listServerNamedQueues();
396425
assertThat(queuesDuringTest).hasSize(queuesBeforeTest.size() + 1);
397426
queuesDuringTest.removeAll(queuesBeforeTest);
398427
assertThat(queuesDuringTest).hasSize(1);
399428
createdQueue = queuesDuringTest.get(0);
400429
long messageCountBeforeClosing = msgConsumed.get();
401430
closeAllConnections();
402431
waitAtMost(10, () -> msgConsumed.get() >= 2 * messageCountBeforeClosing);
403-
assertThat(Host.listQueues()).hasSize(queuesBeforeTest.size() + 1);
432+
assertThat(Host.listServerNamedQueues()).hasSize(queuesBeforeTest.size() + 1);
404433
completionHandler.countDown("stopped in test");
405434
waitAtMost(10, () -> testIsDone.get());
406435
} finally {
@@ -414,7 +443,7 @@ void durableServerNamedQueueShouldBeReusedIfStillThere(TestInfo info) throws Exc
414443

415444
@Test
416445
void recreateBindingEvenOnPreDeclaredDurableQueue(TestInfo info) throws Exception {
417-
String queue = queueName(info);
446+
String queue = randomName(info);
418447
ConnectionFactory connectionFactory = new ConnectionFactory();
419448
try (Connection c = connectionFactory.newConnection()) {
420449
Channel ch = c.createChannel();
@@ -463,7 +492,7 @@ private void run(MulticastSet multicastSet) {
463492
// one of the tests stops the execution, no need to be noisy
464493
throw new RuntimeException(e);
465494
} catch (Exception e) {
466-
e.printStackTrace();
495+
throw new RuntimeException(e);
467496
}
468497
});
469498
}

src/test/java/com/rabbitmq/perf/it/Host.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ public static List<ConnectionInfo> listConnections() throws IOException {
145145
return result;
146146
}
147147

148+
public static List<String> listServerNamedQueues() throws IOException {
149+
return listQueues().stream().filter(q -> q.startsWith("amq.gen")).collect(Collectors.toList());
150+
}
151+
148152
public static List<String> listQueues() throws IOException {
149153
// "messages" column will help ignore the header line
150154
String output = capture(rabbitmqctl("list_queues -q name messages").getInputStream());

0 commit comments

Comments
 (0)