25
25
import java .util .Optional ;
26
26
import java .util .concurrent .CompletableFuture ;
27
27
import java .util .concurrent .CountDownLatch ;
28
- import java .util .concurrent .ExecutionException ;
29
28
import java .util .concurrent .ExecutorService ;
30
29
import java .util .concurrent .Executors ;
31
30
import java .util .concurrent .Future ;
@@ -400,13 +399,13 @@ private void addIfFailed(List<Pair<String, Throwable>> failures, String callName
400
399
}
401
400
}
402
401
403
- private void seemsThreadSafeWithProducerCount (int producerCount ) throws InterruptedException , ExecutionException {
402
+ private void seemsThreadSafeWithProducerCount (int producerCount ) {
404
403
assertTimeoutPreemptively (STOCHASTIC_TEST_DURATION .plusSeconds (5 ), () -> {
405
404
AtomicBoolean runProducers = new AtomicBoolean (true );
406
405
AtomicBoolean runConsumers = new AtomicBoolean (true );
407
406
AtomicInteger completesReceived = new AtomicInteger (0 );
408
407
409
- AtomicLong messageCount = new AtomicLong (0 );
408
+ AtomicLong messageSendCount = new AtomicLong (0 );
410
409
AtomicLong messageReceiveCount = new AtomicLong (0 );
411
410
412
411
Semaphore productionLimiter = new Semaphore (101 );
@@ -418,14 +417,15 @@ private void seemsThreadSafeWithProducerCount(int producerCount) throws Interrup
418
417
publisher .subscribe (subscriber );
419
418
420
419
// Producer tasks
420
+ CompletableFuture <?> completed = new CompletableFuture <>();
421
421
List <Future <?>> producers = new ArrayList <>();
422
422
for (int i = 0 ; i < producerCount ; i ++) {
423
423
producers .add (executor .submit (() -> {
424
424
while (runProducers .get ()) {
425
425
productionLimiter .acquire ();
426
- publisher .send (messageCount .getAndIncrement ());
426
+ publisher .send (messageSendCount .getAndIncrement ());
427
427
}
428
- publisher .complete (); // All but one producer sending this will fail.
428
+ publisher .complete (). thenRun (() -> completed . complete ( null )) ; // All but one producer sending this will fail.
429
429
return null ;
430
430
}));
431
431
}
@@ -476,17 +476,21 @@ private void seemsThreadSafeWithProducerCount(int producerCount) throws Interrup
476
476
producer .get ();
477
477
}
478
478
479
+ // Make sure to flush out everything left in the queue.
480
+ completed .get ();
481
+ subscriber .subscription .request (Long .MAX_VALUE );
482
+
479
483
// Shut down consumers
480
484
runConsumers .set (false );
481
485
requestLimiter .release ();
482
486
requester .get ();
483
487
consumer .get ();
484
488
489
+ assertThat (messageReceiveCount .get ()).isEqualTo (messageSendCount .get ());
485
490
assertThat (completesReceived .get ()).isEqualTo (1 );
486
- assertThat (messageReceiveCount .get ()).isEqualTo (messageCount .get ());
487
491
488
492
// Make sure we actually tested something
489
- assertThat (messageCount .get ()).isGreaterThan (10 );
493
+ assertThat (messageSendCount .get ()).isGreaterThan (10 );
490
494
});
491
495
}
492
496
0 commit comments