Skip to content

Commit ffbf2ff

Browse files
onErrorFlatMap + OnErrorThrowable
1 parent 37523c0 commit ffbf2ff

13 files changed

+292
-99
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import rx.exceptions.Exceptions;
28+
import rx.exceptions.OnErrorThrowable;
2829
import rx.exceptions.OnErrorNotImplementedException;
2930
import rx.functions.Action0;
3031
import rx.functions.Action1;
@@ -79,16 +80,13 @@
7980
import rx.operators.OperationMergeMaxConcurrent;
8081
import rx.operators.OperationMinMax;
8182
import rx.operators.OperationMulticast;
82-
import rx.operators.OperatorOnErrorResumeNextViaFunction;
8383
import rx.operators.OperationOnErrorResumeNextViaObservable;
8484
import rx.operators.OperationOnErrorReturn;
8585
import rx.operators.OperationOnExceptionResumeNextViaObservable;
8686
import rx.operators.OperationParallelMerge;
8787
import rx.operators.OperationReplay;
8888
import rx.operators.OperationRetry;
8989
import rx.operators.OperationSample;
90-
import rx.operators.OperatorObserveOnBounded;
91-
import rx.operators.OperatorScan;
9290
import rx.operators.OperationSequenceEqual;
9391
import rx.operators.OperationSingle;
9492
import rx.operators.OperationSkip;
@@ -117,8 +115,11 @@
117115
import rx.operators.OperatorMap;
118116
import rx.operators.OperatorMerge;
119117
import rx.operators.OperatorObserveOn;
118+
import rx.operators.OperatorOnErrorResumeNextViaFunction;
119+
import rx.operators.OperatorOnErrorFlatMap;
120120
import rx.operators.OperatorParallel;
121121
import rx.operators.OperatorRepeat;
122+
import rx.operators.OperatorScan;
122123
import rx.operators.OperatorSubscribeOn;
123124
import rx.operators.OperatorTake;
124125
import rx.operators.OperatorTimeout;
@@ -5209,7 +5210,7 @@ public final Boolean call(T t) {
52095210
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onerrorresumenext">RxJava Wiki: onErrorResumeNext()</a>
52105211
*/
52115212
public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction) {
5212-
return create(OperatorOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction));
5213+
return lift(new OperatorOnErrorResumeNextViaFunction<T>(resumeFunction));
52135214
}
52145215

52155216
/**
@@ -5267,6 +5268,15 @@ public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFun
52675268
return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction));
52685269
}
52695270

5271+
/**
5272+
* Allows inserting onNext events into a stream when onError events are received
5273+
* and continuing the original sequence instead of terminating. Thus it allows a sequence
5274+
* with multiple onError events.
5275+
*/
5276+
public final Observable<T> onErrorFlatMap(final Func1<OnErrorThrowable, ? extends Observable<? extends T>> resumeFunction) {
5277+
return lift(new OperatorOnErrorFlatMap<T>(resumeFunction));
5278+
}
5279+
52705280
/**
52715281
* Instruct an Observable to pass control to another Observable rather than invoking
52725282
* {@link Observer#onError onError} if it encounters an {@link java.lang.Exception}.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.exceptions;
17+
18+
public class OnErrorThrowable extends RuntimeException {
19+
20+
private static final long serialVersionUID = -569558213262703934L;
21+
22+
private final boolean hasValue;
23+
private final Object value;
24+
25+
public OnErrorThrowable(Throwable exception) {
26+
super(exception);
27+
hasValue = false;
28+
this.value = null;
29+
}
30+
31+
public OnErrorThrowable(Throwable exception, Object value) {
32+
super(exception);
33+
hasValue = true;
34+
this.value = value;
35+
}
36+
37+
public Object getValue() {
38+
return value;
39+
}
40+
41+
public boolean isValueNull() {
42+
return hasValue;
43+
}
44+
45+
public static OnErrorThrowable from(Throwable t) {
46+
if (t instanceof OnErrorThrowable) {
47+
return (OnErrorThrowable) t;
48+
} else {
49+
return new OnErrorThrowable(t);
50+
}
51+
}
52+
}

rxjava-core/src/main/java/rx/operators/OperatorCast.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20-
20+
import rx.exceptions.OnErrorThrowable;
2121

2222
/**
2323
* Converts the elements of an observable sequence to the specified type.
@@ -46,7 +46,11 @@ public void onError(Throwable e) {
4646

4747
@Override
4848
public void onNext(T t) {
49-
o.onNext(castClass.cast(t));
49+
try {
50+
o.onNext(castClass.cast(t));
51+
} catch (Throwable e) {
52+
onError(new OnErrorThrowable(e, t));
53+
}
5054
}
5155
};
5256
}

rxjava-core/src/main/java/rx/operators/OperatorDoOnEach.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable.Operator;
1919
import rx.Observer;
2020
import rx.Subscriber;
21+
import rx.exceptions.OnErrorThrowable;
2122

2223
/**
2324
* Converts the elements of an observable sequence to the specified type.
@@ -59,7 +60,7 @@ public void onNext(T value) {
5960
try {
6061
doOnEachObserver.onNext(value);
6162
} catch (Throwable e) {
62-
onError(e);
63+
onError(new OnErrorThrowable(e, value));
6364
return;
6465
}
6566
observer.onNext(value);

rxjava-core/src/main/java/rx/operators/OperatorFilter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.OnErrorThrowable;
2021
import rx.functions.Func1;
21-
import rx.observables.GroupedObservable;
2222

2323
/**
2424
* Filters an Observable by discarding any items it emits that do not meet some test.
@@ -48,13 +48,13 @@ public void onError(Throwable e) {
4848
}
4949

5050
@Override
51-
public void onNext(T value) {
51+
public void onNext(T t) {
5252
try {
53-
if (predicate.call(value)) {
54-
child.onNext(value);
53+
if (predicate.call(t)) {
54+
child.onNext(t);
5555
}
56-
} catch (Throwable ex) {
57-
child.onError(ex);
56+
} catch (Throwable e) {
57+
child.onError(new OnErrorThrowable(e, t));
5858
}
5959
}
6060

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Observable.OnSubscribe;
2424
import rx.Observable.Operator;
2525
import rx.Subscriber;
26+
import rx.exceptions.OnErrorThrowable;
2627
import rx.functions.Action0;
2728
import rx.functions.Func1;
2829
import rx.observables.GroupedObservable;
@@ -130,7 +131,7 @@ public void onNext(T t) {
130131
// we have the correct group so send value to it
131132
gps.onNext(t);
132133
} catch (Throwable e) {
133-
onError(e);
134+
onError(new OnErrorThrowable(e, t));
134135
}
135136
}
136137

rxjava-core/src/main/java/rx/operators/OperatorMap.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.OnErrorThrowable;
2021
import rx.functions.Func1;
2122

2223
/**
@@ -52,7 +53,7 @@ public void onNext(T t) {
5253
try {
5354
o.onNext(transformer.call(t));
5455
} catch (Throwable e) {
55-
onError(e);
56+
onError(new OnErrorThrowable(e, t));
5657
}
5758
}
5859

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
import rx.exceptions.OnErrorThrowable;
22+
import rx.functions.Func1;
23+
24+
/**
25+
* Allows inserting onNext events into a stream when onError events are received
26+
* and continuing the original sequence instead of terminating. Thus it allows a sequence
27+
* with multiple onError events.
28+
*/
29+
public final class OperatorOnErrorFlatMap<T> implements Operator<T, T> {
30+
31+
private final Func1<OnErrorThrowable, ? extends Observable<? extends T>> resumeFunction;
32+
33+
public OperatorOnErrorFlatMap(Func1<OnErrorThrowable, ? extends Observable<? extends T>> f) {
34+
this.resumeFunction = f;
35+
}
36+
37+
@Override
38+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39+
return new Subscriber<T>(child) {
40+
41+
@Override
42+
public void onCompleted() {
43+
child.onCompleted();
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
try {
49+
Observable<? extends T> resume = resumeFunction.call(OnErrorThrowable.from(e));
50+
resume.subscribe(new Subscriber<T>() {
51+
52+
@Override
53+
public void onCompleted() {
54+
// ignore as we will continue the parent Observable
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
// if the splice also fails we shut it all down
60+
child.onError(e);
61+
}
62+
63+
@Override
64+
public void onNext(T t) {
65+
child.onNext(t);
66+
}
67+
68+
});
69+
} catch (Throwable e2) {
70+
child.onError(e2);
71+
}
72+
}
73+
74+
@Override
75+
public void onNext(T t) {
76+
child.onNext(t);
77+
}
78+
79+
};
80+
}
81+
82+
}

0 commit comments

Comments
 (0)