Skip to content

Commit 849d7fe

Browse files
author
jmhofer
committed
added sum operations
1 parent 9a1832e commit 849d7fe

File tree

2 files changed

+179
-0
lines changed

2 files changed

+179
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import rx.operators.OperationScan;
5151
import rx.operators.OperationSkip;
5252
import rx.operators.OperationSubscribeOn;
53+
import rx.operators.OperationSum;
5354
import rx.operators.OperationSwitch;
5455
import rx.operators.OperationSynchronize;
5556
import rx.operators.OperationTake;
@@ -2058,6 +2059,42 @@ public Integer call(Integer t1, T t2) {
20582059
});
20592060
}
20602061

2062+
/**
2063+
* Returns an Observable that sums up the elements in the source Observable.
2064+
* @param source
2065+
* Source observable to compute the sum of.
2066+
* @return an Observable emitting the sum of all the elements of the source Observable
2067+
* as its single item.
2068+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2069+
*/
2070+
public static Observable<Integer> sum(Observable<Integer> source) {
2071+
return OperationSum.sum(source);
2072+
}
2073+
2074+
/**
2075+
* @see #sum(Observable)
2076+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2077+
*/
2078+
public static Observable<Long> sumLongs(Observable<Long> source) {
2079+
return OperationSum.sumLongs(source);
2080+
}
2081+
2082+
/**
2083+
* @see #sum(Observable)
2084+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2085+
*/
2086+
public static Observable<Float> sumFloats(Observable<Float> source) {
2087+
return OperationSum.sumFloats(source);
2088+
}
2089+
2090+
/**
2091+
* @see #sum(Observable)
2092+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2093+
*/
2094+
public static Observable<Double> sumDoubles(Observable<Double> source) {
2095+
return OperationSum.sumDoubles(source);
2096+
}
2097+
20612098
/**
20622099
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
20632100
* Observable that will replay all of its items and notifications to any future {@link Observer}.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import org.junit.Test;
21+
22+
import rx.Observable;
23+
import rx.Observer;
24+
import rx.util.functions.Func2;
25+
26+
/**
27+
* A few operators for implementing the sum operation.
28+
*/
29+
public final class OperationSum {
30+
public static Observable<Integer> sum(Observable<Integer> source) {
31+
return source.reduce(0, new Func2<Integer, Integer, Integer>() {
32+
@Override
33+
public Integer call(Integer accu, Integer next) {
34+
return accu + next;
35+
}
36+
});
37+
}
38+
39+
public static Observable<Long> sumLongs(Observable<Long> source) {
40+
return source.reduce(0L, new Func2<Long, Long, Long>() {
41+
@Override
42+
public Long call(Long accu, Long next) {
43+
return accu + next;
44+
}
45+
});
46+
}
47+
48+
public static Observable<Float> sumFloats(Observable<Float> source) {
49+
return source.reduce(0.0f, new Func2<Float, Float, Float>() {
50+
@Override
51+
public Float call(Float accu, Float next) {
52+
return accu + next;
53+
}
54+
});
55+
}
56+
57+
public static Observable<Double> sumDoubles(Observable<Double> source) {
58+
return source.reduce(0.0d, new Func2<Double, Double, Double>() {
59+
@Override
60+
public Double call(Double accu, Double next) {
61+
return accu + next;
62+
}
63+
});
64+
}
65+
66+
public static class UnitTest {
67+
@SuppressWarnings("unchecked")
68+
Observer<Integer> w = mock(Observer.class);
69+
@SuppressWarnings("unchecked")
70+
Observer<Long> wl = mock(Observer.class);
71+
@SuppressWarnings("unchecked")
72+
Observer<Float> wf = mock(Observer.class);
73+
@SuppressWarnings("unchecked")
74+
Observer<Double> wd = mock(Observer.class);
75+
76+
@Test
77+
public void testSumOfAFewInts() throws Throwable {
78+
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5);
79+
sum(src).subscribe(w);
80+
81+
verify(w, times(1)).onNext(anyInt());
82+
verify(w).onNext(15);
83+
verify(w, never()).onError(any(Throwable.class));
84+
verify(w, times(1)).onCompleted();
85+
}
86+
87+
@Test
88+
public void testEmptySum() throws Throwable {
89+
Observable<Integer> src = Observable.from();
90+
sum(src).subscribe(w);
91+
92+
verify(w, times(1)).onNext(anyInt());
93+
verify(w).onNext(0);
94+
verify(w, never()).onError(any(Throwable.class));
95+
verify(w, times(1)).onCompleted();
96+
}
97+
98+
@Test
99+
public void testSumOfAFewFloats() throws Throwable {
100+
Observable<Float> src = Observable.from(1.0f);
101+
sumFloats(src).subscribe(wf);
102+
103+
verify(wf, times(1)).onNext(anyFloat());
104+
verify(wf).onNext(1.0f);
105+
verify(wf, never()).onError(any(Throwable.class));
106+
verify(wf, times(1)).onCompleted();
107+
}
108+
109+
@Test
110+
public void testEmptySumFloats() throws Throwable {
111+
Observable<Float> src = Observable.from();
112+
sumFloats(src).subscribe(wf);
113+
114+
verify(wf, times(1)).onNext(anyFloat());
115+
verify(wf).onNext(0.0f);
116+
verify(wf, never()).onError(any(Throwable.class));
117+
verify(wf, times(1)).onCompleted();
118+
}
119+
120+
@Test
121+
public void testSumOfAFewDoubles() throws Throwable {
122+
Observable<Double> src = Observable.from(0.0d, 1.0d, 0.5d);
123+
sumDoubles(src).subscribe(wd);
124+
125+
verify(wd, times(1)).onNext(anyDouble());
126+
verify(wd).onNext(1.5d);
127+
verify(wd, never()).onError(any(Throwable.class));
128+
verify(wd, times(1)).onCompleted();
129+
}
130+
131+
@Test
132+
public void testEmptySumDoubles() throws Throwable {
133+
Observable<Double> src = Observable.from();
134+
sumDoubles(src).subscribe(wd);
135+
136+
verify(wd, times(1)).onNext(anyDouble());
137+
verify(wd).onNext(0.0d);
138+
verify(wd, never()).onError(any(Throwable.class));
139+
verify(wd, times(1)).onCompleted();
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)