Skip to content

Commit 7d3606e

Browse files
committed
Retry until queue is fully purged
The test remains flaky even with publish confirms. Purging the queue repeatedly until all published messages have been purged serves the same purpose of eventual visibility. (cherry picked from commit faeed57)
1 parent 69226ad commit 7d3606e

File tree

1 file changed

+24
-45
lines changed

1 file changed

+24
-45
lines changed

src/test/java/com/rabbitmq/client/test/server/EffectVisibilityCrossNodeTest.java

Lines changed: 24 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,11 @@
1515

1616
package com.rabbitmq.client.test.server;
1717

18-
import static org.assertj.core.api.Assertions.assertThat;
1918
import static org.junit.Assert.assertEquals;
2019

2120
import java.io.IOException;
22-
import java.util.Iterator;
23-
import java.util.Set;
2421
import java.util.concurrent.*;
2522

26-
import java.util.concurrent.atomic.AtomicReference;
27-
import org.junit.Assert;
2823
import org.junit.Test;
2924

3025
import com.rabbitmq.client.test.functional.ClusteredTestBase;
@@ -62,49 +57,33 @@ protected void releaseResources() throws IOException {
6257
private static final byte[] msg = "".getBytes();
6358

6459
@Test public void effectVisibility() throws Exception {
65-
AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>();
66-
Set<Long> publishIds = ConcurrentHashMap.newKeySet();
67-
channel.addConfirmListener(
68-
(deliveryTag, multiple) -> {
69-
if (multiple) {
70-
Iterator<Long> iterator = publishIds.iterator();
71-
while (iterator.hasNext()) {
72-
long publishId = iterator.next();
73-
if (publishId <= deliveryTag) {
74-
iterator.remove();
75-
}
76-
}
77-
} else {
78-
publishIds.remove(deliveryTag);
79-
}
80-
if (publishIds.isEmpty()) {
81-
confirmLatch.get().countDown();
60+
// the test bulk is asynchronous because this test has a history of hanging
61+
Future<Void> task =
62+
executorService.submit(
63+
() -> {
64+
for (int i = 0; i < BATCHES; i++) {
65+
Thread.sleep(10); // to avoid flow control for the connection
66+
for (int j = 0; j < MESSAGES_PER_BATCH; j++) {
67+
channel.basicPublish("amq.fanout", "", null, msg);
8268
}
83-
},
84-
(deliveryTag, multiple) -> {});
85-
// the test bulk is asynchronous because this test has a history of hanging
86-
Future<Void> task = executorService.submit(() -> {
87-
// we use publish confirm to make sure messages made it to the queues
88-
// before checking their content
89-
channel.confirmSelect();
90-
for (int i = 0; i < BATCHES; i++) {
91-
Thread.sleep(10); // to avoid flow control for the connection
92-
confirmLatch.set(new CountDownLatch(1));
93-
for (int j = 0; j < MESSAGES_PER_BATCH; j++) {
94-
long publishId = channel.getNextPublishSeqNo();
95-
publishIds.add(publishId);
96-
channel.basicPublish("amq.fanout", "", null, msg);
97-
}
98-
boolean confirmed = confirmLatch.get().await(10, TimeUnit.SECONDS);
99-
if (!confirmed) {
100-
Assert.fail("Messages not confirmed in 10 seconds: " + publishIds);
101-
}
102-
publishIds.clear();
103-
for (int j = 0; j < queues.length; j++) {
104-
assertEquals(MESSAGES_PER_BATCH, channel.queuePurge(queues[j]).getMessageCount());
69+
for (int j = 0; j < queues.length; j++) {
70+
String queue = queues[j];
71+
long timeout = 10 * 1000;
72+
long waited = 0;
73+
int purged = 0;
74+
while (waited < timeout) {
75+
purged += channel.queuePurge(queue).getMessageCount();
76+
if (purged == MESSAGES_PER_BATCH) {
77+
break;
10578
}
79+
Thread.sleep(10);
80+
waited += 10;
81+
}
82+
assertEquals("Queue " + queue + " should have been purged after 10 seconds",
83+
MESSAGES_PER_BATCH, purged);
10684
}
107-
return null;
85+
}
86+
return null;
10887
});
10988
task.get(1, TimeUnit.MINUTES);
11089
}

0 commit comments

Comments
 (0)