3535import java .util .concurrent .Executors ;
3636import java .util .concurrent .Future ;
3737import java .util .concurrent .TimeUnit ;
38+ import java .util .concurrent .atomic .AtomicBoolean ;
3839import java .util .concurrent .atomic .AtomicInteger ;
3940import java .util .stream .IntStream ;
4041import java .util .stream .Stream ;
@@ -78,7 +79,7 @@ public class MessageCountTimeLimitTest {
7879
7980 MulticastSet .ThreadingHandler th ;
8081
81- volatile Future <?> runningTest ;
82+ AtomicBoolean testIsDone ;
8283
8384 volatile long testDurationInMs ;
8485
@@ -107,6 +108,7 @@ public void init() throws Exception {
107108 when (cf .newConnection (anyString ())).thenReturn (c );
108109 when (c .createChannel ()).thenReturn (ch );
109110
111+ testIsDone = new AtomicBoolean (false );
110112 executorService = Executors .newCachedThreadPool ();
111113 th = new MulticastSet .DefaultThreadingHandler ();
112114 testDurationInMs = -1 ;
@@ -138,7 +140,7 @@ public void noLimit() throws Exception {
138140 assertThat ("1000 messages should have been published by now" ,
139141 publishedLatch .await (5 , TimeUnit .SECONDS ), is (true ));
140142
141- assertThat (runningTest . isDone (), is (false ));
143+ assertThat (testIsDone . get (), is (false ));
142144 // only the configuration connection has been closed
143145 // so the test is still running in the background
144146 verify (c , times (1 )).close ();
@@ -152,7 +154,7 @@ public void timeLimit() {
152154
153155 run (multicastSet );
154156
155- waitAtMost (10 , TimeUnit .SECONDS ).until (() -> runningTest . isDone (), is (true ));
157+ waitAtMost (10 , TimeUnit .SECONDS ).until (() -> testIsDone . get (), is (true ));
156158 assertThat (testDurationInMs , greaterThanOrEqualTo (5000L ));
157159 }
158160
@@ -180,7 +182,7 @@ public void producerCount(int producersCount, int channelsCount) throws Exceptio
180182
181183 assertThat (messagesTotal + " messages should have been published by now" ,
182184 publishedLatch .await (10 , TimeUnit .SECONDS ), is (true ));
183- waitAtMost (5 , TimeUnit .SECONDS ).until (() -> runningTest . isDone (), is (true ));
185+ waitAtMost (5 , TimeUnit .SECONDS ).until (() -> testIsDone . get (), is (true ));
184186 verify (ch , times (messagesTotal ))
185187 .basicPublish (anyString (), anyString (),
186188 anyBoolean (), anyBoolean (),
@@ -218,7 +220,7 @@ public void consumerCount(int consumersCount, int channelsCount) throws Exceptio
218220 sendMessagesToConsumer (messagesCount , consumer );
219221 }
220222
221- waitAtMost (5 , TimeUnit .SECONDS ).until (() -> runningTest . isDone (), is (true ));
223+ waitAtMost (5 , TimeUnit .SECONDS ).until (() -> testIsDone . get (), is (true ));
222224 }
223225
224226 // --time 5 -x 1 --pmessages 10 -y 1 --cmessages 10
@@ -255,10 +257,10 @@ public void timeLimitTakesPrecedenceOverCounts() throws Exception {
255257 assertThat (nbMessages + " messages should have been published by now" ,
256258 publishedLatch .await (5 , TimeUnit .SECONDS ), is (true ));
257259
258- assertThat (runningTest . isDone (), is (false ));
260+ assertThat (testIsDone . get (), is (false ));
259261
260- waitAtMost (10 , TimeUnit .SECONDS ).until (() -> runningTest . isDone (), is (true ));
261- assertThat (testDurationInMs , greaterThan (5000L ));
262+ waitAtMost (10 , TimeUnit .SECONDS ).until (() -> testIsDone . get (), is (true ));
263+ assertThat (testDurationInMs , greaterThanOrEqualTo (5000L ));
262264 }
263265
264266 // -x 0 -y 1
@@ -285,7 +287,7 @@ public void consumerOnlyDoesNotStop() throws Exception {
285287 consumersLatch .await (5 , TimeUnit .SECONDS ), is (true ));
286288 assertThat (consumerArgumentCaptor .getValue (), notNullValue ());
287289
288- assertThat (runningTest . isDone (), is (false ));
290+ assertThat (testIsDone . get (), is (false ));
289291 // only the configuration connection has been closed
290292 // so the test is still running in the background
291293 verify (c , times (1 )).close ();
@@ -312,7 +314,7 @@ public void producerOnlyDoesNotStop() throws Exception {
312314
313315 assertThat ("1000 messages should have been published by now" ,
314316 publishedLatch .await (5 , TimeUnit .SECONDS ), is (true ));
315- assertThat (runningTest . isDone (), is (false ));
317+ assertThat (testIsDone . get (), is (false ));
316318 // only the configuration connection has been closed
317319 // so the test is still running in the background
318320 verify (c , times (1 )).close ();
@@ -353,11 +355,12 @@ stats, cf, params, singletonList("amqp://localhost"),
353355 }
354356
355357 private void run (MulticastSet multicastSet ) {
356- this . runningTest = executorService .submit (() -> {
358+ executorService .submit (() -> {
357359 try {
358360 long start = System .nanoTime ();
359361 multicastSet .run ();
360362 testDurationInMs = (System .nanoTime () - start ) / 1_000_000 ;
363+ testIsDone .set (true );
361364 } catch (Exception e ) {
362365 throw new RuntimeException (e );
363366 }
0 commit comments