Skip to content

Commit c9585d1

Browse files
Merge pull request #1558 from benjchristensen/issue-1550
mergeMap generics
2 parents 6a402da + d8edc16 commit c9585d1

File tree

3 files changed

+83
-152
lines changed

3 files changed

+83
-152
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5182,6 +5182,10 @@ public final Long call(Long t1, T t2) {
51825182
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
51835183
return lift(new OperatorMap<T, R>(func));
51845184
}
5185+
5186+
private final <R> Observable<R> mapNotification(Func1<? super T, ? extends R> onNext, Func1<? super Throwable, ? extends R> onError, Func0<? extends R> onCompleted) {
5187+
return lift(new OperatorMapNotification<T, R>(onNext, onError, onCompleted));
5188+
}
51855189

51865190
/**
51875191
* Returns an Observable that represents all of the emissions <em>and</em> notifications from the source
@@ -5254,7 +5258,7 @@ public final <R> Observable<R> mergeMap(
52545258
Func1<? super T, ? extends Observable<? extends R>> onNext,
52555259
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
52565260
Func0<? extends Observable<? extends R>> onCompleted) {
5257-
return lift(new OperatorMergeMapTransform<T, R>(onNext, onError, onCompleted));
5261+
return merge(mapNotification(onNext, onError, onCompleted));
52585262
}
52595263

52605264
/**
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.exceptions.OnErrorThrowable;
21+
import rx.functions.Func0;
22+
import rx.functions.Func1;
23+
24+
/**
25+
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
26+
* this transformation as a new {@code Observable}.
27+
* <p>
28+
* <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/Netflix/RxJava/images/rx-operators/map.png" alt="">
29+
*/
30+
public final class OperatorMapNotification<T, R> implements Operator<R, T> {
31+
32+
private final Func1<? super T, ? extends R> onNext;
33+
private final Func1<? super Throwable, ? extends R> onError;
34+
private final Func0<? extends R> onCompleted;
35+
36+
public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? super Throwable, ? extends R> onError, Func0<? extends R> onCompleted) {
37+
this.onNext = onNext;
38+
this.onError = onError;
39+
this.onCompleted = onCompleted;
40+
}
41+
42+
@Override
43+
public Subscriber<? super T> call(final Subscriber<? super R> o) {
44+
return new Subscriber<T>(o) {
45+
46+
@Override
47+
public void onCompleted() {
48+
try {
49+
o.onNext(onCompleted.call());
50+
o.onCompleted();
51+
} catch (Throwable e) {
52+
o.onError(e);
53+
}
54+
}
55+
56+
@Override
57+
public void onError(Throwable e) {
58+
try {
59+
o.onNext(onError.call(e));
60+
o.onCompleted();
61+
} catch (Throwable e2) {
62+
o.onError(e);
63+
}
64+
}
65+
66+
@Override
67+
public void onNext(T t) {
68+
try {
69+
o.onNext(onNext.call(t));
70+
} catch (Throwable e) {
71+
o.onError(OnErrorThrowable.addValueAsLastCause(e, t));
72+
}
73+
}
74+
75+
};
76+
}
77+
78+
}

rxjava-core/src/main/java/rx/internal/operators/OperatorMergeMapTransform.java

Lines changed: 0 additions & 151 deletions
This file was deleted.

0 commit comments

Comments
 (0)