Skip to content

Commit cd04714

Browse files
committed
Operator OnExceptionResumeNextViaObservable
1 parent 95e0636 commit cd04714

File tree

4 files changed

+94
-131
lines changed

4 files changed

+94
-131
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import rx.operators.OperationMulticast;
6060
import rx.operators.OperationOnErrorResumeNextViaObservable;
6161
import rx.operators.OperationOnErrorReturn;
62-
import rx.operators.OperationOnExceptionResumeNextViaObservable;
6362
import rx.operators.OperationParallelMerge;
6463
import rx.operators.OperationReplay;
6564
import rx.operators.OperationSample;
@@ -108,6 +107,7 @@
108107
import rx.operators.OperatorObserveOn;
109108
import rx.operators.OperatorOnErrorFlatMap;
110109
import rx.operators.OperatorOnErrorResumeNextViaFunction;
110+
import rx.operators.OperatorOnExceptionResumeNextViaObservable;
111111
import rx.operators.OperatorParallel;
112112
import rx.operators.OperatorPivot;
113113
import rx.operators.OperatorRepeat;
@@ -4592,7 +4592,7 @@ public final Observable<T> onErrorFlatMap(final Func1<OnErrorThrowable, ? extend
45924592
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onexceptionresumenext">RxJava Wiki: onExceptionResumeNext()</a>
45934593
*/
45944594
public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence) {
4595-
return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(this, resumeSequence));
4595+
return lift(new OperatorOnExceptionResumeNextViaObservable<T>(resumeSequence));
45964596
}
45974597

45984598
/**

rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java

Lines changed: 0 additions & 122 deletions
This file was deleted.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
22+
/**
23+
* Instruct an Observable to pass control to another Observable rather than invoking
24+
* <code>onError</code> if it encounters an error of type {@link java.lang.Exception}.
25+
* <p>
26+
* This differs from {@link Observable#onErrorResumeNext} in that this one does not handle
27+
* {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through.
28+
* <p>
29+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/onErrorResumeNext.png">
30+
* <p>
31+
* By default, when an Observable encounters an error that prevents it from emitting the expected
32+
* item to its Observer, the Observable invokes its Observer's <code>onError</code> method, and
33+
* then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation
34+
* changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the
35+
* source Observable encounters an error, instead of invoking its Observer's <code>onError</code>
36+
* method, it will instead relinquish control to this new Observable, which will invoke the
37+
* Observer's <code>onNext</code> method if it is able to do so. In such a case, because no
38+
* Observable necessarily invokes <code>onError</code>, the Observer may never know that an error
39+
* happened.
40+
* <p>
41+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
42+
* encountered.
43+
*
44+
* @param <T> the value type
45+
*/
46+
public final class OperatorOnExceptionResumeNextViaObservable<T> implements Operator<T, T> {
47+
final Observable<? extends T> resumeSequence;
48+
49+
public OperatorOnExceptionResumeNextViaObservable(Observable<? extends T> resumeSequence) {
50+
this.resumeSequence = resumeSequence;
51+
}
52+
53+
@Override
54+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
55+
// needs to independently unsubscribe so child can continue with the resume
56+
Subscriber<T> s = new Subscriber<T>() {
57+
58+
@Override
59+
public void onNext(T t) {
60+
child.onNext(t);
61+
}
62+
63+
@Override
64+
public void onError(Throwable e) {
65+
if (e instanceof Exception) {
66+
unsubscribe();
67+
resumeSequence.unsafeSubscribe(child);
68+
} else {
69+
child.onError(e);
70+
}
71+
}
72+
73+
@Override
74+
public void onCompleted() {
75+
child.onCompleted();
76+
}
77+
78+
};
79+
child.add(s);
80+
81+
return s;
82+
}
83+
84+
85+
}

rxjava-core/src/test/java/rx/operators/OperationOnExceptionResumeNextViaObservableTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorOnExceptionResumeNextViaObservableTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.mockito.Mockito.times;
2323
import static org.mockito.Mockito.verify;
2424
import static org.mockito.Mockito.verifyNoMoreInteractions;
25-
import static rx.operators.OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable;
2625

2726
import org.junit.Test;
2827
import org.mockito.Mockito;
@@ -32,7 +31,7 @@
3231
import rx.Subscription;
3332
import rx.functions.Func1;
3433

35-
public class OperationOnExceptionResumeNextViaObservableTest {
34+
public class OperatorOnExceptionResumeNextViaObservableTest {
3635

3736
@Test
3837
public void testResumeNextWithException() {
@@ -41,7 +40,7 @@ public void testResumeNextWithException() {
4140
TestObservable f = new TestObservable(s, "one", "EXCEPTION", "two", "three");
4241
Observable<String> w = Observable.create(f);
4342
Observable<String> resume = Observable.from("twoResume", "threeResume");
44-
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
43+
Observable<String> observable = w.onExceptionResumeNext(resume);
4544

4645
@SuppressWarnings("unchecked")
4746
Observer<String> observer = mock(Observer.class);
@@ -70,7 +69,7 @@ public void testResumeNextWithRuntimeException() {
7069
TestObservable f = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three");
7170
Observable<String> w = Observable.create(f);
7271
Observable<String> resume = Observable.from("twoResume", "threeResume");
73-
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
72+
Observable<String> observable = w.onExceptionResumeNext(resume);
7473

7574
@SuppressWarnings("unchecked")
7675
Observer<String> observer = mock(Observer.class);
@@ -99,7 +98,7 @@ public void testThrowablePassesThru() {
9998
TestObservable f = new TestObservable(s, "one", "THROWABLE", "two", "three");
10099
Observable<String> w = Observable.create(f);
101100
Observable<String> resume = Observable.from("twoResume", "threeResume");
102-
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
101+
Observable<String> observable = w.onExceptionResumeNext(resume);
103102

104103
@SuppressWarnings("unchecked")
105104
Observer<String> observer = mock(Observer.class);
@@ -128,7 +127,7 @@ public void testErrorPassesThru() {
128127
TestObservable f = new TestObservable(s, "one", "ERROR", "two", "three");
129128
Observable<String> w = Observable.create(f);
130129
Observable<String> resume = Observable.from("twoResume", "threeResume");
131-
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
130+
Observable<String> observable = w.onExceptionResumeNext(resume);
132131

133132
@SuppressWarnings("unchecked")
134133
Observer<String> observer = mock(Observer.class);
@@ -162,6 +161,7 @@ public void testMapResumeAsyncNext() {
162161
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
163162
// rx.operator incl onErrorResumeNextViaObservable)
164163
w = w.map(new Func1<String, String>() {
164+
@Override
165165
public String call(String s) {
166166
if ("fail".equals(s))
167167
throw new RuntimeException("Forced Failure");
@@ -170,7 +170,7 @@ public String call(String s) {
170170
}
171171
});
172172

173-
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
173+
Observable<String> observable = w.onExceptionResumeNext(resume);
174174

175175
@SuppressWarnings("unchecked")
176176
Observer<String> observer = mock(Observer.class);

0 commit comments

Comments
 (0)