Skip to content

Commit f1b4de7

Browse files
Merge pull request #670 from benjchristensen/api-tweaks
API Design Tweaks
2 parents ee0e470 + 985005e commit f1b4de7

File tree

7 files changed

+118
-97
lines changed

7 files changed

+118
-97
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 106 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,7 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
10541054
public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
10551055
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10));
10561056
}
1057-
1057+
10581058
/**
10591059
* Generates an Observable that emits a sequence of Integers within a
10601060
* specified range.
@@ -1930,6 +1930,23 @@ public static <T> Observable<T> switchDo(Observable<? extends Observable<? exten
19301930
public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
19311931
return create(OperationSwitch.switchDo(sequenceOfSequences));
19321932
}
1933+
1934+
/**
1935+
* Given an Observable that emits Observables, returns an Observable that
1936+
* emits the items emitted by the most recently emitted of those
1937+
* Observables.
1938+
* <p>
1939+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/switchDo.png">
1940+
*
1941+
* @param sequenceOfSequences the source Observable that emits Observables
1942+
* @return an Observable that emits only the items emitted by the Observable
1943+
* most recently emitted by the source Observable
1944+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#switchonnext">RxJava Wiki: switchOnNext()</a>
1945+
* @see {@link #switchOnNext(Observable)}
1946+
*/
1947+
public static <T> Observable<T> switchLatest(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
1948+
return create(OperationSwitch.switchDo(sequenceOfSequences));
1949+
}
19331950

19341951
/**
19351952
* Return a particular one of several possible Observables based on a case
@@ -3681,7 +3698,7 @@ public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit
36813698
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#zip">RxJava Wiki: zip()</a>
36823699
*/
36833700
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
3684-
return ws.toList().mapMany(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
3701+
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
36853702
@Override
36863703
public Observable<R> call(List<? extends Observable<?>> wsList) {
36873704
return create(OperationZip.zip(wsList, zipFunction));
@@ -3919,7 +3936,64 @@ public Observable<T> finallyDo(Action0 action) {
39193936
* @see #mapMany(Func1)
39203937
*/
39213938
public <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3922-
return mapMany(func);
3939+
return mergeMap(func);
3940+
}
3941+
3942+
/**
3943+
* Creates a new Observable by applying a function that you supply to each
3944+
* item emitted by the source Observable, where that function returns an
3945+
* Observable, and then merging those resulting Observables and emitting the
3946+
* results of this merger.
3947+
* <p>
3948+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/flatMap.png">
3949+
* <p>
3950+
* Note: {@code mapMany} and {@code flatMap} are equivalent.
3951+
*
3952+
* @param func a function that, when applied to an item emitted by the
3953+
* source Observable, returns an Observable
3954+
* @return an Observable that emits the result of applying the
3955+
* transformation function to each item emitted by the source
3956+
* Observable and merging the results of the Observables obtained
3957+
* from this transformation.
3958+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mapmany-or-flatmap-and-mapmanydelayerror">RxJava Wiki: flatMap()</a>
3959+
* @see #flatMap(Func1)
3960+
*/
3961+
public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3962+
return merge(map(func));
3963+
}
3964+
3965+
/**
3966+
* Creates a new Observable by applying a function that you supply to each
3967+
* item emitted by the source Observable, where that function returns an
3968+
* Observable, and then concatting those resulting Observables and emitting the
3969+
* results of this concat.
3970+
* <p>
3971+
*
3972+
* @param func a function that, when applied to an item emitted by the
3973+
* source Observable, returns an Observable
3974+
* @return an Observable that emits the result of applying the
3975+
* transformation function to each item emitted by the source
3976+
* Observable and concatting the results of the Observables obtained
3977+
* from this transformation.
3978+
*/
3979+
public <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3980+
return concat(map(func));
3981+
}
3982+
3983+
/**
3984+
* Creates a new Observable by applying a function that you supply to each
3985+
* item emitted by the source Observable resulting in an Observable of Observables.
3986+
* <p>
3987+
* Then a {@link #switchLatest(Observable)} / {@link #switchOnNext(Observable)} is applied.
3988+
*
3989+
* @param func a function that, when applied to an item emitted by the
3990+
* source Observable, returns an Observable
3991+
* @return an Observable that emits the result of applying the
3992+
* transformation function to each item emitted by the source
3993+
* Observable and then switch
3994+
*/
3995+
public <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3996+
return switchOnNext(map(func));
39233997
}
39243998

39253999
/**
@@ -3970,6 +4044,7 @@ public <R> Observable<R> map(Func1<? super T, ? extends R> func) {
39704044
* transformed by the given function
39714045
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mapwithindex">RxJava Wiki: mapWithIndex()</a>
39724046
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244311.aspx">MSDN: Observable.Select</a>
4047+
* @deprecate just use zip with {@link Observable#range(int)}
39734048
*/
39744049
public <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends R> func) {
39754050
return create(OperationMap.mapWithIndex(this, func));
@@ -3993,9 +4068,10 @@ public <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends R> fun
39934068
* from this transformation.
39944069
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mapmany-or-flatmap-and-mapmanydelayerror">RxJava Wiki: mapMany()</a>
39954070
* @see #flatMap(Func1)
4071+
* @deprecated
39964072
*/
39974073
public <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<? extends R>> func) {
3998-
return create(OperationMap.mapMany(this, func));
4074+
return mergeMap(func);
39994075
}
40004076

40014077
/**
@@ -5108,6 +5184,7 @@ public ConnectableObservable<T> publishLast() {
51085184
*
51095185
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#reduce-or-aggregate">RxJava Wiki: aggregate()</a>
51105186
* @see #reduce(Func2)
5187+
* @deprecated
51115188
*/
51125189
public Observable<T> aggregate(Func2<T, T, T> accumulator) {
51135190
return reduce(accumulator);
@@ -5150,6 +5227,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
51505227
*
51515228
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#reduce-or-aggregate">RxJava Wiki: aggregate()</a>
51525229
* @see #reduce(Object, Func2)
5230+
* @deprecated
51535231
*/
51545232
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
51555233
return reduce(initialValue, accumulator);
@@ -6641,36 +6719,6 @@ public Observable<T> doOnEach(Observer<? super T> observer) {
66416719
return create(OperationDoOnEach.doOnEach(this, observer));
66426720
}
66436721

6644-
/**
6645-
* Invokes an action for each item emitted by an Observable.
6646-
* <p>
6647-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnEach.png">
6648-
*
6649-
* @param onNext the action to invoke for each item emitted by the source
6650-
* Observable
6651-
* @return the source Observable with the side-effecting behavior applied
6652-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#dooneach">RxJava Wiki: doOnEach()</a>
6653-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804.aspx">MSDN: Observable.Do</a>
6654-
*/
6655-
public Observable<T> doOnEach(final Action1<? super T> onNext) {
6656-
Observer<T> observer = new Observer<T>() {
6657-
@Override
6658-
public void onCompleted() {}
6659-
6660-
@Override
6661-
public void onError(Throwable e) {}
6662-
6663-
@Override
6664-
public void onNext(T args) {
6665-
onNext.call(args);
6666-
}
6667-
6668-
};
6669-
6670-
6671-
return create(OperationDoOnEach.doOnEach(this, observer));
6672-
}
6673-
66746722
/**
66756723
* Invokes an action if the source Observable calls <code>onError</code>.
66766724
* <p>
@@ -6731,71 +6779,64 @@ public void onNext(T args) { }
67316779

67326780
return create(OperationDoOnEach.doOnEach(this, observer));
67336781
}
6734-
6782+
6783+
67356784
/**
6736-
* Invokes an action for each item emitted by an Observable.
6785+
* Invokes an action when the source Observable calls
6786+
* <code>onNext</code>.
67376787
* <p>
6738-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnEach.e.png">
6788+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnCompleted.png">
67396789
*
6740-
* @param onNext the action to invoke for each item emitted by the
6741-
* Observable
6742-
* @param onError the action to invoke when the source Observable calls
6743-
* <code>onError</code>
6790+
* @param onCompleted the action to invoke when the source Observable calls
6791+
* <code>onCompleted</code>
67446792
* @return the source Observable with the side-effecting behavior applied
6745-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#dooneach">RxJava Wiki: doOnEach()</a>
6746-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229539.aspx">MSDN: Observable.Do</a>
6793+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#dooneach">RxJava Wiki: doOnNext()</a>
6794+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804.aspx">MSDN: Observable.Do</a>
67476795
*/
6748-
public Observable<T> doOnEach(final Action1<? super T> onNext, final Action1<Throwable> onError) {
6796+
public Observable<T> doOnNext(final Action1<T> onNext) {
67496797
Observer<T> observer = new Observer<T>() {
67506798
@Override
6751-
public void onCompleted() {}
6799+
public void onCompleted() { }
67526800

67536801
@Override
6754-
public void onError(Throwable e) {
6755-
onError.call(e);
6756-
}
6802+
public void onError(Throwable e) { }
67576803

67586804
@Override
6759-
public void onNext(T args) {
6805+
public void onNext(T args) {
67606806
onNext.call(args);
67616807
}
67626808

67636809
};
67646810

6765-
67666811
return create(OperationDoOnEach.doOnEach(this, observer));
67676812
}
6768-
6813+
67696814
/**
6770-
* Invokes an action for each item emitted by an Observable.
6815+
* Invokes an action for each item emitted by the Observable.
67716816
* <p>
6772-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnEach.ce.png">
6817+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnEach.png">
67736818
*
6774-
* @param onNext the action to invoke for each item emitted by the
6775-
* Observable
6776-
* @param onError the action to invoke when the source Observable calls
6777-
* <code>onError</code>
6778-
* @param onCompleted the action to invoke when the source Observable calls
6779-
* <code>onCompleted</code>
6819+
* @param observer the action to invoke for each item emitted by the source
6820+
* Observable
67806821
* @return the source Observable with the side-effecting behavior applied
67816822
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#dooneach">RxJava Wiki: doOnEach()</a>
6782-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229830.aspx">MSDN: Observable.Do</a>
6823+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229307.aspx">MSDN: Observable.Do</a>
67836824
*/
6784-
public Observable<T> doOnEach(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
6825+
public Observable<T> doOnEach(final Action1<Notification<T>> onNotification) {
67856826
Observer<T> observer = new Observer<T>() {
67866827
@Override
67876828
public void onCompleted() {
6788-
onCompleted.call();
6829+
onNotification.call(new Notification<T>());
67896830
}
67906831

67916832
@Override
67926833
public void onError(Throwable e) {
6793-
onError.call(e);
6834+
onNotification.call(new Notification<T>(e));
67946835
}
67956836

67966837
@Override
6797-
public void onNext(T args) {
6798-
onNext.call(args);
6838+
public void onNext(T v) {
6839+
onNotification.call(new Notification<T>(v));
67996840
}
68006841

68016842
};

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -77,26 +77,6 @@ public Subscription onSubscribe(Observer<? super R> observer) {
7777
};
7878
}
7979

80-
/**
81-
* Accepts a sequence of observable sequences and a transformation function. Returns a flattened sequence that is the result of
82-
* applying the transformation function to each item in the sequence of each observable sequence.
83-
* <p>
84-
* The closure should return an Observable which will then be merged.
85-
*
86-
* @param sequence
87-
* the input sequence.
88-
* @param func
89-
* a function to apply to each item in the sequence.
90-
* @param <T>
91-
* the type of the input sequence.
92-
* @param <R>
93-
* the type of the output sequence.
94-
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
95-
*/
96-
public static <T, R> OnSubscribeFunc<R> mapMany(Observable<? extends T> sequence, Func1<? super T, ? extends Observable<? extends R>> func) {
97-
return OperationMerge.merge(Observable.create(map(sequence, func)));
98-
}
99-
10080
/**
10181
* An observable sequence that is the result of applying a transformation to each item in an input sequence.
10282
*

rxjava-core/src/main/java/rx/operators/OperationTakeLast.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import rx.Observer;
2626
import rx.Scheduler;
2727
import rx.Subscription;
28-
import rx.subscriptions.SingleAssignmentSubscription;
28+
import rx.subscriptions.BooleanSubscription;
2929
import rx.util.Timestamped;
3030

3131
/**
@@ -156,9 +156,9 @@ public TakeLastTimed(Observable<? extends T> source, int count, long time, TimeU
156156

157157
@Override
158158
public Subscription onSubscribe(Observer<? super T> t1) {
159-
SingleAssignmentSubscription sas = new SingleAssignmentSubscription();
160-
sas.set(source.subscribe(new TakeLastTimedObserver<T>(t1, sas, count, ageMillis, scheduler)));
161-
return sas;
159+
SafeObservableSubscription s = new SafeObservableSubscription();
160+
source.subscribe(new TakeLastTimedObserver<T>(t1, s, count, ageMillis, scheduler));
161+
return s;
162162
}
163163
}
164164
/** Observes source values and keeps the most recent items. */

rxjava-core/src/test/java/rx/ObservableDoOnTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class ObservableDoOnTest {
3030
@Test
3131
public void testDoOnEach() {
3232
final AtomicReference<String> r = new AtomicReference<String>();
33-
String output = Observable.from("one").doOnEach(new Action1<String>() {
33+
String output = Observable.from("one").doOnNext(new Action1<String>() {
3434

3535
@Override
3636
public void call(String v) {

rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public String call(String s) {
108108
@Test
109109
public void testDoOnEachWithErrorInCallback() {
110110
Observable<String> base = Observable.from("one", "two", "fail", "three");
111-
Observable<String> doOnEach = base.doOnEach(new Action1<String>() {
111+
Observable<String> doOnEach = base.doOnNext(new Action1<String>() {
112112
@Override
113113
public void call(String s) {
114114
if ("fail".equals(s)) {

rxjava-core/src/test/java/rx/operators/OperationMapTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void testMapMany() {
118118
Observable<Integer> ids = Observable.from(1, 2);
119119

120120
/* now simulate the behavior to take those IDs and perform nested async calls based on them */
121-
Observable<String> m = Observable.create(mapMany(ids, new Func1<Integer, Observable<String>>() {
121+
Observable<String> m = ids.flatMap(new Func1<Integer, Observable<String>>() {
122122

123123
@Override
124124
public Observable<String> call(Integer id) {
@@ -143,7 +143,7 @@ public String call(Map<String, String> map) {
143143
}));
144144
}
145145

146-
}));
146+
});
147147
m.subscribe(stringObserver);
148148

149149
verify(stringObserver, never()).onError(any(Throwable.class));
@@ -166,7 +166,7 @@ public void testMapMany2() {
166166

167167
Observable<Observable<Map<String, String>>> observable = Observable.from(observable1, observable2);
168168

169-
Observable<String> m = Observable.create(mapMany(observable, new Func1<Observable<Map<String, String>>, Observable<String>>() {
169+
Observable<String> m = observable.flatMap(new Func1<Observable<Map<String, String>>, Observable<String>>() {
170170

171171
@Override
172172
public Observable<String> call(Observable<Map<String, String>> o) {
@@ -179,7 +179,7 @@ public String call(Map<String, String> map) {
179179
}));
180180
}
181181

182-
}));
182+
});
183183
m.subscribe(stringObserver);
184184

185185
verify(stringObserver, never()).onError(any(Throwable.class));

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void testThreadName() throws InterruptedException {
9898
final CountDownLatch completedLatch = new CountDownLatch(1);
9999

100100
// assert subscribe is on main thread
101-
obs = obs.doOnEach(new Action1<String>() {
101+
obs = obs.doOnNext(new Action1<String>() {
102102

103103
@Override
104104
public void call(String s) {
@@ -110,7 +110,7 @@ public void call(String s) {
110110
});
111111

112112
// assert observe is on new thread
113-
obs.observeOn(Schedulers.newThread()).doOnEach(new Action1<String>() {
113+
obs.observeOn(Schedulers.newThread()).doOnNext(new Action1<String>() {
114114

115115
@Override
116116
public void call(String t1) {

0 commit comments

Comments
 (0)