File tree Expand file tree Collapse file tree 2 files changed +44
-5
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +44
-5
lines changed Original file line number Diff line number Diff line change @@ -49,22 +49,36 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4949 final Worker worker = scheduler .createWorker ();
5050 child .add (worker );
5151 return new Subscriber <T >(child ) {
52-
52+ // indicates an error cut ahead
53+ // accessed from the worker thread only
54+ boolean done ;
5355 @ Override
5456 public void onCompleted () {
5557 worker .schedule (new Action0 () {
5658
5759 @ Override
5860 public void call () {
59- child .onCompleted ();
61+ if (!done ) {
62+ done = true ;
63+ child .onCompleted ();
64+ }
6065 }
6166
6267 }, delay , unit );
6368 }
6469
6570 @ Override
66- public void onError (Throwable e ) {
67- child .onError (e );
71+ public void onError (final Throwable e ) {
72+ worker .schedule (new Action0 () {
73+ @ Override
74+ public void call () {
75+ if (!done ) {
76+ done = true ;
77+ child .onError (e );
78+ worker .unsubscribe ();
79+ }
80+ }
81+ });
6882 }
6983
7084 @ Override
@@ -73,7 +87,9 @@ public void onNext(final T t) {
7387
7488 @ Override
7589 public void call () {
76- child .onNext (t );
90+ if (!done ) {
91+ child .onNext (t );
92+ }
7793 }
7894
7995 }, delay , unit );
Original file line number Diff line number Diff line change @@ -798,4 +798,27 @@ public Integer call(Integer t) {
798798 ts .assertNoErrors ();
799799 assertEquals (RxRingBuffer .SIZE * 2 , ts .getOnNextEvents ().size ());
800800 }
801+
802+ @ Test
803+ public void testErrorRunsBeforeOnNext () {
804+ TestScheduler test = Schedulers .test ();
805+
806+ PublishSubject <Integer > ps = PublishSubject .create ();
807+
808+ TestSubscriber <Integer > ts = TestSubscriber .create ();
809+
810+ ps .delay (1 , TimeUnit .SECONDS , test ).subscribe (ts );
811+
812+ ps .onNext (1 );
813+
814+ test .advanceTimeBy (500 , TimeUnit .MILLISECONDS );
815+
816+ ps .onError (new TestException ());
817+
818+ test .advanceTimeBy (1 , TimeUnit .SECONDS );
819+
820+ ts .assertNoValues ();
821+ ts .assertError (TestException .class );
822+ ts .assertNotCompleted ();
823+ }
801824}
You can’t perform that action at this time.
0 commit comments