2020import org .mockito .InOrder ;
2121import org .mockito .Mock ;
2222import org .mockito .MockitoAnnotations ;
23+
2324import rx .Observable ;
2425import rx .Observable .OnSubscribe ;
2526import rx .Observer ;
2627import rx .Subscriber ;
2728import rx .exceptions .CompositeException ;
2829import rx .exceptions .TestException ;
30+ import rx .observers .TestSubscriber ;
2931
3032import java .util .ArrayList ;
3133import java .util .List ;
3234import java .util .concurrent .CountDownLatch ;
35+ import java .util .concurrent .TimeUnit ;
3336
3437import static org .junit .Assert .*;
3538import static org .mockito .Matchers .any ;
@@ -475,4 +478,51 @@ public void onCompleted() {
475478 inOrder .verify (o ).onError (any (TestException .class ));
476479 verify (o , never ()).onCompleted ();
477480 }
478- }
481+
482+ @ Test
483+ public void testErrorInParentObservableDelayed () throws Exception {
484+ final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable ();
485+ final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable ();
486+ Observable <Observable <String >> parentObservable = Observable .create (new Observable .OnSubscribe <Observable <String >>() {
487+ @ Override
488+ public void call (Subscriber <? super Observable <String >> op ) {
489+ op .onNext (Observable .create (o1 ));
490+ op .onNext (Observable .create (o2 ));
491+ op .onError (new NullPointerException ("throwing exception in parent" ));
492+ }
493+ });
494+
495+ TestSubscriber <String > ts = new TestSubscriber <String >(stringObserver );
496+ Observable <String > m = Observable .mergeDelayError (parentObservable );
497+ m .subscribe (ts );
498+ ts .awaitTerminalEvent (2000 , TimeUnit .MILLISECONDS );
499+ ts .assertTerminalEvent ();
500+
501+ verify (stringObserver , times (2 )).onNext ("hello" );
502+ verify (stringObserver , times (1 )).onError (any (NullPointerException .class ));
503+ verify (stringObserver , never ()).onCompleted ();
504+ }
505+
506+ private static class TestASynchronous1sDelayedObservable implements Observable .OnSubscribe <String > {
507+ Thread t ;
508+
509+ @ Override
510+ public void call (final Subscriber <? super String > observer ) {
511+ t = new Thread (new Runnable () {
512+
513+ @ Override
514+ public void run () {
515+ try {
516+ Thread .sleep (100 );
517+ } catch (InterruptedException e ) {
518+ observer .onError (e );
519+ }
520+ observer .onNext ("hello" );
521+ observer .onCompleted ();
522+ }
523+
524+ });
525+ t .start ();
526+ }
527+ }
528+ }
0 commit comments