@@ -309,16 +309,17 @@ public void call(Integer t1) {
309309 @ Test
310310 public void testNonBlockingOuterWhileBlockingOnNext () throws InterruptedException {
311311
312- final CountDownLatch latch = new CountDownLatch (1 );
312+ final CountDownLatch completedLatch = new CountDownLatch (1 );
313+ final CountDownLatch nextLatch = new CountDownLatch (1 );
313314 final AtomicLong completeTime = new AtomicLong ();
314315 // use subscribeOn to make async, observeOn to move
315- Observable .range (1 , 1000 ).subscribeOn (Schedulers .newThread ()).observeOn (Schedulers .newThread ()).subscribe (new Observer <Integer >() {
316+ Observable .range (1 , 2 ).subscribeOn (Schedulers .newThread ()).observeOn (Schedulers .newThread ()).subscribe (new Observer <Integer >() {
316317
317318 @ Override
318319 public void onCompleted () {
319320 System .out .println ("onCompleted" );
320321 completeTime .set (System .nanoTime ());
321- latch .countDown ();
322+ completedLatch .countDown ();
322323 }
323324
324325 @ Override
@@ -328,20 +329,27 @@ public void onError(Throwable e) {
328329
329330 @ Override
330331 public void onNext (Integer t ) {
331-
332+ // don't let this thing finish yet
333+ try {
334+ if (!nextLatch .await (1000 , TimeUnit .MILLISECONDS )) {
335+ throw new RuntimeException ("it shouldn't have timed out" );
336+ }
337+ } catch (InterruptedException e ) {
338+ throw new RuntimeException ("it shouldn't have failed" );
339+ }
332340 }
333341
334342 });
335343
336344 long afterSubscribeTime = System .nanoTime ();
337- System .out .println ("After subscribe: " + latch .getCount ());
338- assertEquals (1 , latch .getCount ());
339- latch .await ();
345+ System .out .println ("After subscribe: " + completedLatch .getCount ());
346+ assertEquals (1 , completedLatch .getCount ());
347+ nextLatch .countDown ();
348+ completedLatch .await (1000 , TimeUnit .MILLISECONDS );
340349 assertTrue (completeTime .get () > afterSubscribeTime );
341350 System .out .println ("onComplete nanos after subscribe: " + (completeTime .get () - afterSubscribeTime ));
342351 }
343352
344-
345353 private static int randomIntFrom0to100 () {
346354 // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
347355 long x = System .nanoTime ();
0 commit comments