|
18 | 18 | import static org.junit.Assert.assertEquals; |
19 | 19 |
|
20 | 20 | import java.io.IOException; |
| 21 | +import java.util.concurrent.*; |
21 | 22 |
|
22 | 23 | import org.junit.Test; |
23 | 24 |
|
@@ -46,20 +47,30 @@ protected void releaseResources() throws IOException { |
46 | 47 | } |
47 | 48 |
|
48 | 49 | private static final int QUEUES = 5; |
49 | | - private static final int BATCHES = 500; |
50 | | - private static final int MESSAGES_PER_BATCH = 10; |
| 50 | + private static final int BATCHES = 100; |
| 51 | + private static final int MESSAGES_PER_BATCH = 5; |
51 | 52 |
|
52 | 53 | private static final byte[] msg = "".getBytes(); |
53 | 54 |
|
54 | 55 | @Test public void effectVisibility() throws Exception { |
55 | | - |
56 | | - for (int i = 0; i < BATCHES; i++) { |
57 | | - for (int j = 0; j < MESSAGES_PER_BATCH; j++) { |
58 | | - channel.basicPublish("amq.fanout", "", null, msg); |
59 | | - } |
60 | | - for (int j = 0; j < queues.length ; j++) { |
61 | | - assertEquals(MESSAGES_PER_BATCH, channel.queuePurge(queues[j]).getMessageCount()); |
62 | | - } |
| 56 | + ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| 57 | + try { |
| 58 | + Future<Void> task = executorService.submit(() -> { |
| 59 | + for (int i = 0; i < BATCHES; i++) { |
| 60 | + Thread.sleep(10); // to avoid flow control for the connection |
| 61 | + for (int j = 0; j < MESSAGES_PER_BATCH; j++) { |
| 62 | + channel.basicPublish("amq.fanout", "", null, msg); |
| 63 | + } |
| 64 | + for (int j = 0; j < queues.length; j++) { |
| 65 | + assertEquals(MESSAGES_PER_BATCH, channel.queuePurge(queues[j]).getMessageCount()); |
| 66 | + } |
| 67 | + } |
| 68 | + return null; |
| 69 | + }); |
| 70 | + task.get(1, TimeUnit.MINUTES); |
| 71 | + } finally { |
| 72 | + executorService.shutdownNow(); |
| 73 | + executorService.awaitTermination(1, TimeUnit.SECONDS); |
63 | 74 | } |
64 | 75 | } |
65 | 76 | } |
0 commit comments