Skip to content

Commit 50ba9de

Browse files
Merge pull request #479 from nullstyle/add_doOnEach
Adds doOnEach operator
2 parents 7475a15 + 3687530 commit 50ba9de

File tree

4 files changed

+366
-0
lines changed

4 files changed

+366
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1807,6 +1807,28 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
18071807
def withFilter(p: T => Boolean): WithFilter[T] = {
18081808
new WithFilter[T](p, asJava)
18091809
}
1810+
1811+
1812+
def doOnEach(observer: Observer[T]): Observable[T] = {
1813+
Observable[T](asJava.doOnEach(observer))
1814+
}
1815+
1816+
def doOnEach(onNext: T => Unit): Observable[T] = {
1817+
Observable[T](asJava.doOnEach(onNext))
1818+
}
1819+
1820+
def doOnEach(onNext: T => Unit, onComplete: () => Unit): Observable[T] = {
1821+
Observable[T](asJava.doOnEach(onNext, onComplete))
1822+
}
1823+
1824+
def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
1825+
Observable[T](asJava.doOnEach(onNext, onError))
1826+
}
1827+
1828+
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Observable[T] = {
1829+
Observable[T](asJava.doOnEach(onNext, onError, onComplete))
1830+
}
1831+
18101832

18111833
}
18121834

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import rx.operators.OperationDematerialize;
4545
import rx.operators.OperationDistinct;
4646
import rx.operators.OperationDistinctUntilChanged;
47+
import rx.operators.OperationDoOnEach;
4748
import rx.operators.OperationElementAt;
4849
import rx.operators.OperationFilter;
4950
import rx.operators.OperationFinally;
@@ -4961,6 +4962,154 @@ public static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>>
49614962
return create(OperationAmb.amb(sources));
49624963
}
49634964

4965+
4966+
/**
4967+
* Invokes an action for each element in the observable sequence.
4968+
*
4969+
* @param observer
4970+
* The action to invoke for each element in the source sequence.
4971+
*
4972+
* @return
4973+
* The source sequence with the side-effecting behavior applied.
4974+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229307(v=vs.103).aspx">MSDN: Observable.Do</a>
4975+
*/
4976+
public Observable<T> doOnEach(Observer<? super T> observer) {
4977+
return create(OperationDoOnEach.doOnEach(this, observer));
4978+
}
4979+
4980+
/**
4981+
* Invokes an action for each element in the observable sequence.
4982+
*
4983+
* @param onNext
4984+
* The action to invoke for each element in the source sequence.
4985+
*
4986+
* @return
4987+
* The source sequence with the side-effecting behavior applied.
4988+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804(v=vs.103).aspx">MSDN: Observable.Do</a>
4989+
*/
4990+
public Observable<T> doOnEach(final Action1<T> onNext) {
4991+
Observer<T> observer = new Observer<T>() {
4992+
@Override
4993+
public void onCompleted() {}
4994+
4995+
@Override
4996+
public void onError(Throwable e) {}
4997+
4998+
@Override
4999+
public void onNext(T args) {
5000+
onNext.call(args);
5001+
}
5002+
5003+
};
5004+
5005+
5006+
return create(OperationDoOnEach.doOnEach(this, observer));
5007+
}
5008+
5009+
/**
5010+
* Invokes an action for each element in the observable sequence.
5011+
*
5012+
* @param onNext
5013+
* The action to invoke for each element in the source sequence.
5014+
* @param onCompleted
5015+
* The action to invoke when the source sequence is completed.
5016+
*
5017+
* @return
5018+
* The source sequence with the side-effecting behavior applied.
5019+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229659(v=vs.103).aspx">MSDN: Observable.Do</a>
5020+
*/
5021+
public Observable<T> doOnEach(final Action1<T> onNext, final Action0 onCompleted) {
5022+
Observer<T> observer = new Observer<T>() {
5023+
@Override
5024+
public void onCompleted() {
5025+
onCompleted.call();
5026+
}
5027+
5028+
@Override
5029+
public void onError(Throwable e) {}
5030+
5031+
@Override
5032+
public void onNext(T args) {
5033+
onNext.call(args);
5034+
}
5035+
5036+
};
5037+
5038+
5039+
return create(OperationDoOnEach.doOnEach(this, observer));
5040+
}
5041+
5042+
/**
5043+
* Invokes an action for each element in the observable sequence.
5044+
*
5045+
* @param onNext
5046+
* The action to invoke for each element in the source sequence.
5047+
* @param onError
5048+
* The action to invoke when the source sequence calls onError.
5049+
*
5050+
* @return
5051+
* The source sequence with the side-effecting behavior applied.
5052+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229539(v=vs.103).aspx">MSDN: Observable.Do</a>
5053+
*/
5054+
public Observable<T> doOnEach(final Action1<T> onNext, final Action1<Throwable> onError) {
5055+
Observer<T> observer = new Observer<T>() {
5056+
@Override
5057+
public void onCompleted() {}
5058+
5059+
@Override
5060+
public void onError(Throwable e) {
5061+
onError.call(e);
5062+
}
5063+
5064+
@Override
5065+
public void onNext(T args) {
5066+
onNext.call(args);
5067+
}
5068+
5069+
};
5070+
5071+
5072+
return create(OperationDoOnEach.doOnEach(this, observer));
5073+
}
5074+
5075+
5076+
/**
5077+
* Invokes an action for each element in the observable sequence.
5078+
*
5079+
* @param onNext
5080+
* The action to invoke for each element in the source sequence.
5081+
* @param onError
5082+
* The action to invoke when the source sequence calls onError.
5083+
* @param onCompleted
5084+
* The action to invoke when the source sequence is completed.
5085+
*
5086+
* @return
5087+
* The source sequence with the side-effecting behavior applied.
5088+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229830(v=vs.103).aspx">MSDN: Observable.Do</a>
5089+
*/
5090+
public Observable<T> doOnEach(final Action1<T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
5091+
Observer<T> observer = new Observer<T>() {
5092+
@Override
5093+
public void onCompleted() {
5094+
onCompleted.call();
5095+
}
5096+
5097+
@Override
5098+
public void onError(Throwable e) {
5099+
onError.call(e);
5100+
}
5101+
5102+
@Override
5103+
public void onNext(T args) {
5104+
onNext.call(args);
5105+
}
5106+
5107+
};
5108+
5109+
5110+
return create(OperationDoOnEach.doOnEach(this, observer));
5111+
}
5112+
49645113
/**
49655114
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
49665115
* <p>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Subscription;
22+
23+
/**
24+
* Converts the elements of an observable sequence to the specified type.
25+
*/
26+
public class OperationDoOnEach {
27+
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> sequence, Observer<? super T> observer) {
28+
return new DoOnEachObservable<T>(sequence, observer);
29+
}
30+
31+
private static class DoOnEachObservable<T> implements OnSubscribeFunc<T> {
32+
33+
private final Observable<? extends T> sequence;
34+
private final Observer<? super T> doOnEachObserver;
35+
36+
public DoOnEachObservable(Observable<? extends T> sequence, Observer<? super T> doOnEachObserver) {
37+
this.sequence = sequence;
38+
this.doOnEachObserver = doOnEachObserver;
39+
}
40+
41+
@Override
42+
public Subscription onSubscribe(final Observer<? super T> observer) {
43+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
44+
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
45+
@Override
46+
public void onCompleted() {
47+
doOnEachObserver.onCompleted();
48+
observer.onCompleted();
49+
}
50+
51+
@Override
52+
public void onError(Throwable e) {
53+
doOnEachObserver.onError(e);
54+
observer.onError(e);
55+
}
56+
57+
@Override
58+
public void onNext(T value) {
59+
doOnEachObserver.onNext(value);
60+
observer.onNext(value);
61+
}
62+
})));
63+
}
64+
65+
}
66+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
21+
import static rx.operators.OperationMap.*;
22+
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.InOrder;
31+
import org.mockito.Mock;
32+
import org.mockito.MockitoAnnotations;
33+
34+
import rx.Observable;
35+
import rx.Observer;
36+
import rx.concurrency.Schedulers;
37+
import rx.util.functions.Func1;
38+
import rx.util.functions.Func2;
39+
import rx.util.functions.Action1;
40+
41+
public class OperationDoOnEachTest {
42+
43+
@Mock
44+
Observer<String> subscribedObserver;
45+
@Mock
46+
Observer<String> sideEffectObserver;
47+
48+
@Before
49+
public void before() {
50+
MockitoAnnotations.initMocks(this);
51+
}
52+
53+
@Test
54+
public void testDoOnEach() {
55+
Observable<String> base = Observable.from("a", "b", "c");
56+
Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
57+
58+
doOnEach.subscribe(subscribedObserver);
59+
60+
// ensure the leaf observer is still getting called
61+
verify(subscribedObserver, never()).onError(any(Throwable.class));
62+
verify(subscribedObserver, times(1)).onNext("a");
63+
verify(subscribedObserver, times(1)).onNext("b");
64+
verify(subscribedObserver, times(1)).onNext("c");
65+
verify(subscribedObserver, times(1)).onCompleted();
66+
67+
// ensure our injected observer is getting called
68+
verify(sideEffectObserver, never()).onError(any(Throwable.class));
69+
verify(sideEffectObserver, times(1)).onNext("a");
70+
verify(sideEffectObserver, times(1)).onNext("b");
71+
verify(sideEffectObserver, times(1)).onNext("c");
72+
verify(sideEffectObserver, times(1)).onCompleted();
73+
}
74+
75+
76+
77+
@Test
78+
public void testDoOnEachWithError() {
79+
Observable<String> base = Observable.from("one", "fail", "two", "three", "fail");
80+
Observable<String> errs = base.map(new Func1<String, String>() {
81+
@Override
82+
public String call(String s) {
83+
if ("fail".equals(s)) {
84+
throw new RuntimeException("Forced Failure");
85+
}
86+
return s;
87+
}
88+
});
89+
90+
Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
91+
92+
93+
doOnEach.subscribe(subscribedObserver);
94+
verify(subscribedObserver, times(1)).onNext("one");
95+
verify(subscribedObserver, never()).onNext("two");
96+
verify(subscribedObserver, never()).onNext("three");
97+
verify(subscribedObserver, never()).onCompleted();
98+
verify(subscribedObserver, times(1)).onError(any(Throwable.class));
99+
100+
101+
verify(sideEffectObserver, times(1)).onNext("one");
102+
verify(sideEffectObserver, never()).onNext("two");
103+
verify(sideEffectObserver, never()).onNext("three");
104+
verify(sideEffectObserver, never()).onCompleted();
105+
verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
106+
}
107+
108+
@Test
109+
public void testDoOnEachWithErrorInCallback() {
110+
Observable<String> base = Observable.from("one", "two", "fail", "three");
111+
Observable<String> doOnEach = base.doOnEach(new Action1<String>() {
112+
@Override
113+
public void call(String s) {
114+
if ("fail".equals(s)) {
115+
throw new RuntimeException("Forced Failure");
116+
}
117+
}
118+
});
119+
120+
doOnEach.subscribe(subscribedObserver);
121+
verify(subscribedObserver, times(1)).onNext("one");
122+
verify(subscribedObserver, times(1)).onNext("two");
123+
verify(subscribedObserver, never()).onNext("three");
124+
verify(subscribedObserver, never()).onCompleted();
125+
verify(subscribedObserver, times(1)).onError(any(Throwable.class));
126+
127+
}
128+
129+
}

0 commit comments

Comments
 (0)