File tree Expand file tree Collapse file tree 3 files changed +57
-1
lines changed Expand file tree Collapse file tree 3 files changed +57
-1
lines changed Original file line number Diff line number Diff line change @@ -6231,7 +6231,15 @@ public void onNext(T t) {
62316231 * @return Subscription which is the Subscriber passed in
62326232 */
62336233 public final Subscription unsafeSubscribe (Subscriber <? super T > subscriber ) {
6234- onSubscribe .call (subscriber );
6234+ try {
6235+ onSubscribe .call (subscriber );
6236+ } catch (Throwable e ) {
6237+ if (e instanceof OnErrorNotImplementedException ) {
6238+ throw (OnErrorNotImplementedException ) e ;
6239+ }
6240+ // handle broken contracts: https://github.com/Netflix/RxJava/issues/1090
6241+ subscriber .onError (e );
6242+ }
62356243 return subscriber ;
62366244 }
62376245
Original file line number Diff line number Diff line change @@ -323,6 +323,24 @@ public void testError2() {
323323 verify (stringObserver , times (0 )).onNext ("eight" );
324324 verify (stringObserver , times (0 )).onNext ("nine" );
325325 }
326+
327+ @ Test
328+ public void testThrownErrorHandling () {
329+ TestSubscriber <String > ts = new TestSubscriber <String >();
330+ Observable <String > o1 = Observable .create (new OnSubscribe <String >() {
331+
332+ @ Override
333+ public void call (Subscriber <? super String > s ) {
334+ throw new RuntimeException ("fail" );
335+ }
336+
337+ });
338+
339+ Observable .merge (o1 , o1 ).subscribe (ts );
340+ ts .awaitTerminalEvent (1000 , TimeUnit .MILLISECONDS );
341+ ts .assertTerminalEvent ();
342+ System .out .println ("Error: " + ts .getOnErrorEvents ());
343+ }
326344
327345 private static class TestSynchronousObservable implements Observable .OnSubscribeFunc <String > {
328346
Original file line number Diff line number Diff line change @@ -77,6 +77,36 @@ public void call(
7777 assertEquals (0 , observer .getOnErrorEvents ().size ());
7878 assertEquals (1 , observer .getOnCompletedEvents ().size ());
7979 }
80+
81+ @ Test
82+ public void testThrownErrorHandling () {
83+ TestSubscriber <String > ts = new TestSubscriber <String >();
84+ Observable .create (new OnSubscribe <String >() {
85+
86+ @ Override
87+ public void call (Subscriber <? super String > s ) {
88+ throw new RuntimeException ("fail" );
89+ }
90+
91+ }).subscribeOn (Schedulers .computation ()).subscribe (ts );
92+ ts .awaitTerminalEvent (1000 , TimeUnit .MILLISECONDS );
93+ ts .assertTerminalEvent ();
94+ }
95+
96+ @ Test
97+ public void testOnError () {
98+ TestSubscriber <String > ts = new TestSubscriber <String >();
99+ Observable .create (new OnSubscribe <String >() {
100+
101+ @ Override
102+ public void call (Subscriber <? super String > s ) {
103+ s .onError (new RuntimeException ("fail" ));
104+ }
105+
106+ }).subscribeOn (Schedulers .computation ()).subscribe (ts );
107+ ts .awaitTerminalEvent (1000 , TimeUnit .MILLISECONDS );
108+ ts .assertTerminalEvent ();
109+ }
80110
81111 public static class SlowScheduler extends Scheduler {
82112 final Scheduler actual ;
You can’t perform that action at this time.
0 commit comments