Skip to content

Commit c7ff93b

Browse files
Merge branch 'OperatorOnErrorReturn' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents ff14fcc + 7484fc9 commit c7ff93b

File tree

4 files changed

+91
-137
lines changed

4 files changed

+91
-137
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationDelay;
52-
import rx.operators.OperationOnErrorReturn;
5352
import rx.operators.OperationOnExceptionResumeNextViaObservable;
5453
import rx.operators.OperationParallelMerge;
5554
import rx.operators.OperationSequenceEqual;
@@ -103,6 +102,7 @@
103102
import rx.operators.OperatorOnErrorFlatMap;
104103
import rx.operators.OperatorOnErrorResumeNextViaFunction;
105104
import rx.operators.OperatorOnErrorResumeNextViaObservable;
105+
import rx.operators.OperatorOnErrorReturn;
106106
import rx.operators.OperatorParallel;
107107
import rx.operators.OperatorPivot;
108108
import rx.operators.OperatorRepeat;
@@ -4539,7 +4539,7 @@ public final Observable<T> onErrorResumeNext(final Observable<? extends T> resum
45394539
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onerrorreturn">RxJava Wiki: onErrorReturn()</a>
45404540
*/
45414541
public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction) {
4542-
return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction));
4542+
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
45434543
}
45444544

45454545
/**

rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java

Lines changed: 0 additions & 124 deletions
This file was deleted.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.Arrays;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
import rx.exceptions.CompositeException;
22+
import rx.functions.Func1;
23+
24+
/**
25+
* Instruct an Observable to emit a particular item to its Observer's <code>onNext</code> method
26+
* rather than invoking <code>onError</code> if it encounters an error.
27+
* <p>
28+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/onErrorReturn.png">
29+
* <p>
30+
* By default, when an Observable encounters an error that prevents it from emitting the expected
31+
* item to its Observer, the Observable invokes its Observer's <code>onError</code> method, and then
32+
* quits without invoking any more of its Observer's methods. The onErrorReturn operation changes
33+
* this behavior. If you pass a function (resumeFunction) to onErrorReturn, if the original
34+
* Observable encounters an error, instead of invoking its Observer's <code>onError</code> method,
35+
* it will instead pass the return value of resumeFunction to the Observer's <code>onNext</code>
36+
* method.
37+
* <p>
38+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
39+
* encountered.
40+
*
41+
* @param <T> the value type
42+
*/
43+
public final class OperatorOnErrorReturn<T> implements Operator<T, T> {
44+
final Func1<Throwable, ? extends T> resultFunction;
45+
46+
public OperatorOnErrorReturn(Func1<Throwable, ? extends T> resultFunction) {
47+
this.resultFunction = resultFunction;
48+
}
49+
50+
@Override
51+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
52+
return new Subscriber<T>(child) {
53+
54+
@Override
55+
public void onNext(T t) {
56+
child.onNext(t);
57+
}
58+
59+
@Override
60+
public void onError(Throwable e) {
61+
try {
62+
T result = resultFunction.call(e);
63+
64+
child.onNext(result);
65+
} catch (Throwable x) {
66+
child.onError(new CompositeException(Arrays.asList(e, x)));
67+
return;
68+
}
69+
child.onCompleted();
70+
}
71+
72+
@Override
73+
public void onCompleted() {
74+
child.onCompleted();
75+
}
76+
77+
};
78+
}
79+
}

rxjava-core/src/test/java/rx/operators/OperationOnErrorReturnTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorOnErrorReturnTest.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.mockito.Mockito.mock;
2222
import static org.mockito.Mockito.times;
2323
import static org.mockito.Mockito.verify;
24-
import static rx.operators.OperationOnErrorReturn.onErrorReturn;
2524

2625
import java.util.concurrent.atomic.AtomicReference;
2726

@@ -30,10 +29,11 @@
3029

3130
import rx.Observable;
3231
import rx.Observer;
32+
import rx.Subscriber;
3333
import rx.Subscription;
3434
import rx.functions.Func1;
3535

36-
public class OperationOnErrorReturnTest {
36+
public class OperatorOnErrorReturnTest {
3737

3838
@Test
3939
public void testResumeNext() {
@@ -42,15 +42,15 @@ public void testResumeNext() {
4242
Observable<String> w = Observable.create(f);
4343
final AtomicReference<Throwable> capturedException = new AtomicReference<Throwable>();
4444

45-
Observable<String> observable = Observable.create(onErrorReturn(w, new Func1<Throwable, String>() {
45+
Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {
4646

4747
@Override
4848
public String call(Throwable e) {
4949
capturedException.set(e);
5050
return "failure";
5151
}
5252

53-
}));
53+
});
5454

5555
@SuppressWarnings("unchecked")
5656
Observer<String> observer = mock(Observer.class);
@@ -79,15 +79,15 @@ public void testFunctionThrowsError() {
7979
Observable<String> w = Observable.create(f);
8080
final AtomicReference<Throwable> capturedException = new AtomicReference<Throwable>();
8181

82-
Observable<String> observable = Observable.create(onErrorReturn(w, new Func1<Throwable, String>() {
82+
Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {
8383

8484
@Override
8585
public String call(Throwable e) {
8686
capturedException.set(e);
8787
throw new RuntimeException("exception from function");
8888
}
8989

90-
}));
90+
});
9191

9292
@SuppressWarnings("unchecked")
9393
Observer<String> observer = mock(Observer.class);
@@ -108,7 +108,7 @@ public String call(Throwable e) {
108108
assertNotNull(capturedException.get());
109109
}
110110

111-
private static class TestObservable implements Observable.OnSubscribeFunc<String> {
111+
private static class TestObservable implements Observable.OnSubscribe<String> {
112112

113113
final Subscription s;
114114
final String[] values;
@@ -120,7 +120,7 @@ public TestObservable(Subscription s, String... values) {
120120
}
121121

122122
@Override
123-
public Subscription onSubscribe(final Observer<? super String> observer) {
123+
public void call(final Subscriber<? super String> subscriber) {
124124
System.out.println("TestObservable subscribed to ...");
125125
t = new Thread(new Runnable() {
126126

@@ -130,19 +130,18 @@ public void run() {
130130
System.out.println("running TestObservable thread");
131131
for (String s : values) {
132132
System.out.println("TestObservable onNext: " + s);
133-
observer.onNext(s);
133+
subscriber.onNext(s);
134134
}
135135
throw new RuntimeException("Forced Failure");
136136
} catch (Throwable e) {
137-
observer.onError(e);
137+
subscriber.onError(e);
138138
}
139139
}
140140

141141
});
142142
System.out.println("starting TestObservable thread");
143143
t.start();
144144
System.out.println("done starting TestObservable thread");
145-
return s;
146145
}
147146
}
148147
}

0 commit comments

Comments
 (0)