Skip to content

Commit 750c627

Browse files
committed
Adding the hooks unsafeSubscribe by making the execution same as safe version without the safety checks.
1 parent 76f5cc4 commit 750c627

File tree

1 file changed

+34
-19
lines changed

1 file changed

+34
-19
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6834,15 +6834,29 @@ public void onNext(T t) {
68346834
*/
68356835
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
68366836
try {
6837-
onSubscribe.call(subscriber);
6837+
// allow the hook to intercept and/or decorate
6838+
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
6839+
return hook.onSubscribeReturn(subscriber);
68386840
} catch (Throwable e) {
6839-
if (e instanceof OnErrorNotImplementedException) {
6840-
throw (OnErrorNotImplementedException) e;
6841+
// special handling for certain Throwable/Error/Exception types
6842+
Exceptions.throwIfFatal(e);
6843+
// if an unhandled error occurs executing the onSubscribe we will propagate it
6844+
try {
6845+
subscriber.onError(hook.onSubscribeError(e));
6846+
} catch (OnErrorNotImplementedException e2) {
6847+
// special handling when onError is not implemented ... we just rethrow
6848+
throw e2;
6849+
} catch (Throwable e2) {
6850+
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
6851+
// so we are unable to propagate the error correctly and will just throw
6852+
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
6853+
// TODO could the hook be the cause of the error in the on error handling.
6854+
hook.onSubscribeError(r);
6855+
// TODO why aren't we throwing the hook's return value.
6856+
throw r;
68416857
}
6842-
// handle broken contracts: https://github.com/Netflix/RxJava/issues/1090
6843-
subscriber.onError(e);
6858+
return Subscriptions.empty();
68446859
}
6845-
return subscriber;
68466860
}
68476861

68486862
/**
@@ -6880,30 +6894,31 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
68806894
* if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable}
68816895
*/
68826896
public final Subscription subscribe(Subscriber<? super T> subscriber) {
6883-
// allow the hook to intercept and/or decorate
6884-
OnSubscribe<T> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
68856897
// validate and proceed
68866898
if (subscriber == null) {
68876899
throw new IllegalArgumentException("observer can not be null");
68886900
}
6889-
if (onSubscribeFunction == null) {
6901+
if (onSubscribe == null) {
68906902
throw new IllegalStateException("onSubscribe function can not be null.");
68916903
/*
68926904
* the subscribe function can also be overridden but generally that's not the appropriate approach
68936905
* so I won't mention that in the exception
68946906
*/
68956907
}
6908+
/*
6909+
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
6910+
* to user code from within an Observer"
6911+
*/
6912+
// if not already wrapped
6913+
if (!(subscriber instanceof SafeSubscriber)) {
6914+
// assign to `observer` so we return the protected version
6915+
subscriber = new SafeSubscriber<T>(subscriber);
6916+
}
6917+
6918+
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
68966919
try {
6897-
/*
6898-
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
6899-
* to user code from within an Observer"
6900-
*/
6901-
// if not already wrapped
6902-
if (!(subscriber instanceof SafeSubscriber)) {
6903-
// assign to `observer` so we return the protected version
6904-
subscriber = new SafeSubscriber<T>(subscriber);
6905-
}
6906-
onSubscribeFunction.call(subscriber);
6920+
// allow the hook to intercept and/or decorate
6921+
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
69076922
return hook.onSubscribeReturn(subscriber);
69086923
} catch (Throwable e) {
69096924
// special handling for certain Throwable/Error/Exception types

0 commit comments

Comments
 (0)