Skip to content

Commit 53ce8d3

Browse files
more thorough error handling in case Func1 implementation itself fails
- Func1.call may fail (null impl, bad impl, etc) - Func1.call may success but the implementation itself may throw an exception instead of properly catching and sending to onError
1 parent c196715 commit 53ce8d3

File tree

1 file changed

+53
-11
lines changed

1 file changed

+53
-11
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.Future;
2929
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.TimeoutException;
3130
import java.util.concurrent.atomic.AtomicReference;
3231

3332
import org.junit.Before;
@@ -156,18 +155,30 @@ public Subscription subscribe(Observer<T> observer) {
156155
throw new IllegalStateException("onSubscribe function can not be null.");
157156
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
158157
}
159-
if (isTrusted) {
160-
Subscription s = onSubscribe.call(observer);
161-
if (s == null) {
162-
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
163-
// we want to gracefully handle it the same as AtomicObservableSubscription does
164-
return Subscriptions.empty();
158+
try {
159+
if (isTrusted) {
160+
Subscription s = onSubscribe.call(observer);
161+
if (s == null) {
162+
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
163+
// we want to gracefully handle it the same as AtomicObservableSubscription does
164+
return Subscriptions.empty();
165+
} else {
166+
return s;
167+
}
165168
} else {
166-
return s;
169+
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
170+
return subscription.wrap(onSubscribe.call(new AtomicObserver<T>(subscription, observer)));
171+
}
172+
} catch (Exception e) {
173+
// if an unhandled error occurs executing the onSubscribe we will propagate it
174+
try {
175+
observer.onError(e);
176+
} catch (Exception e2) {
177+
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
178+
// so we are unable to propagate the error correctly and will just throw
179+
throw new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
167180
}
168-
} else {
169-
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
170-
return subscription.wrap(onSubscribe.call(new AtomicObserver<T>(subscription, observer)));
181+
return Subscriptions.empty();
171182
}
172183
}
173184

@@ -422,6 +433,18 @@ public void onNext(T args) {
422433
}
423434
}
424435

436+
/**
437+
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
438+
* <p>
439+
* NOTE: This will block even if the Observable is asynchronous.
440+
* <p>
441+
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
442+
*
443+
* @param onNext
444+
* {@link Action1}
445+
* @throws RuntimeException
446+
* if error occurs
447+
*/
425448
@SuppressWarnings({ "rawtypes", "unchecked" })
426449
public void forEach(final Object o) {
427450
if (o instanceof Action1) {
@@ -3412,6 +3435,25 @@ public Boolean call(String s) {
34123435
}));
34133436
}
34143437

3438+
@Test
3439+
public void testOnSubscribeFails() {
3440+
@SuppressWarnings("unchecked")
3441+
Observer<String> observer = mock(Observer.class);
3442+
final RuntimeException re = new RuntimeException("bad impl");
3443+
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
3444+
3445+
@Override
3446+
public Subscription call(Observer<String> t1) {
3447+
throw re;
3448+
}
3449+
3450+
});
3451+
o.subscribe(observer);
3452+
verify(observer, times(0)).onNext(anyString());
3453+
verify(observer, times(0)).onCompleted();
3454+
verify(observer, times(1)).onError(re);
3455+
}
3456+
34153457
@Test
34163458
public void testLastEmptyObservable() {
34173459
Observable<String> obs = Observable.toObservable();

0 commit comments

Comments
 (0)