Skip to content

Commit 225eafa

Browse files
Merge branch 'AggregatorsWithSelector' of github.com:akarnokd/RxJava into pull-657-merge
2 parents d39d9cf + 14b701d commit 225eafa

File tree

7 files changed

+1304
-1
lines changed

7 files changed

+1304
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import rx.observables.BlockingObservable;
3333
import rx.observables.ConnectableObservable;
3434
import rx.observables.GroupedObservable;
35+
import rx.operators.OperationAggregate;
3536
import rx.operators.OperationAll;
3637
import rx.operators.OperationAmb;
3738
import rx.operators.OperationAny;
@@ -4405,6 +4406,54 @@ public static Observable<Double> sumDoubles(Observable<Double> source) {
44054406
return OperationSum.sumDoubles(source);
44064407
}
44074408

4409+
/**
4410+
* Create an Observable that extracts integer values from this Observable via
4411+
* the provided function and computes the integer sum of the value sequence.
4412+
*
4413+
* @param valueExtractor the function to extract an integer from this Observable
4414+
* @return an Observable that extracts integer values from this Observable via
4415+
* the provided function and computes the integer sum of the value sequence.
4416+
*/
4417+
public Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
4418+
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
4419+
}
4420+
4421+
/**
4422+
* Create an Observable that extracts long values from this Observable via
4423+
* the provided function and computes the long sum of the value sequence.
4424+
*
4425+
* @param valueExtractor the function to extract an long from this Observable
4426+
* @return an Observable that extracts long values from this Observable via
4427+
* the provided function and computes the long sum of the value sequence.
4428+
*/
4429+
public Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
4430+
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
4431+
}
4432+
4433+
/**
4434+
* Create an Observable that extracts float values from this Observable via
4435+
* the provided function and computes the float sum of the value sequence.
4436+
*
4437+
* @param valueExtractor the function to extract an float from this Observable
4438+
* @return an Observable that extracts float values from this Observable via
4439+
* the provided function and computes the float sum of the value sequence.
4440+
*/
4441+
public Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
4442+
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
4443+
}
4444+
4445+
/**
4446+
* Create an Observable that extracts double values from this Observable via
4447+
* the provided function and computes the double sum of the value sequence.
4448+
*
4449+
* @param valueExtractor the function to extract an double from this Observable
4450+
* @return an Observable that extracts double values from this Observable via
4451+
* the provided function and computes the double sum of the value sequence.
4452+
*/
4453+
public Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
4454+
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
4455+
}
4456+
44084457
/**
44094458
* Returns an Observable that computes the average of the Integers emitted
44104459
* by the source Observable.
@@ -4470,6 +4519,54 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
44704519
return OperationAverage.averageDoubles(source);
44714520
}
44724521

4522+
/**
4523+
* Create an Observable that extracts integer values from this Observable via
4524+
* the provided function and computes the integer average of the value sequence.
4525+
*
4526+
* @param valueExtractor the function to extract an integer from this Observable
4527+
* @return an Observable that extracts integer values from this Observable via
4528+
* the provided function and computes the integer average of the value sequence.
4529+
*/
4530+
public Observable<Integer> averageInteger(Func1<? super T, Integer> valueExtractor) {
4531+
return create(new OperationAverage.AverageIntegerExtractor<T>(this, valueExtractor));
4532+
}
4533+
4534+
/**
4535+
* Create an Observable that extracts long values from this Observable via
4536+
* the provided function and computes the long average of the value sequence.
4537+
*
4538+
* @param valueExtractor the function to extract an long from this Observable
4539+
* @return an Observable that extracts long values from this Observable via
4540+
* the provided function and computes the long average of the value sequence.
4541+
*/
4542+
public Observable<Long> averageLong(Func1<? super T, Long> valueExtractor) {
4543+
return create(new OperationAverage.AverageLongExtractor<T>(this, valueExtractor));
4544+
}
4545+
4546+
/**
4547+
* Create an Observable that extracts float values from this Observable via
4548+
* the provided function and computes the float average of the value sequence.
4549+
*
4550+
* @param valueExtractor the function to extract an float from this Observable
4551+
* @return an Observable that extracts float values from this Observable via
4552+
* the provided function and computes the float average of the value sequence.
4553+
*/
4554+
public Observable<Float> averageFloat(Func1<? super T, Float> valueExtractor) {
4555+
return create(new OperationAverage.AverageFloatExtractor<T>(this, valueExtractor));
4556+
}
4557+
4558+
/**
4559+
* Create an Observable that extracts double values from this Observable via
4560+
* the provided function and computes the double average of the value sequence.
4561+
*
4562+
* @param valueExtractor the function to extract an double from this Observable
4563+
* @return an Observable that extracts double values from this Observable via
4564+
* the provided function and computes the double average of the value sequence.
4565+
*/
4566+
public Observable<Double> averageDouble(Func1<? super T, Double> valueExtractor) {
4567+
return create(new OperationAverage.AverageDoubleExtractor<T>(this, valueExtractor));
4568+
}
4569+
44734570
/**
44744571
* Returns an Observable that emits the minimum item emitted by the source
44754572
* Observable. If there is more than one such item, it returns the
@@ -5243,6 +5340,49 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
52435340
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
52445341
return reduce(initialValue, accumulator);
52455342
}
5343+
5344+
/**
5345+
* Create an Observable that aggregates the source values with the given accumulator
5346+
* function and projects the final result via the resultselector.
5347+
* <p>
5348+
* Works like the {@link #aggregate(java.lang.Object, rx.util.functions.Func2)} projected
5349+
* with {@link #map(rx.util.functions.Func1)} without the overhead of some helper
5350+
* operators.
5351+
* @param <U> the intermediate (accumulator) type
5352+
* @param <V> the result type
5353+
* @param seed the initial value of the accumulator
5354+
* @param accumulator the function that takes the current accumulator value,
5355+
* the current emitted value and returns a (new) accumulated value.
5356+
* @param resultSelector the selector to project the final value of the accumulator
5357+
* @return an Observable that aggregates the source values with the given accumulator
5358+
* function and projects the final result via the resultselector
5359+
*/
5360+
public <U, V> Observable<V> aggregate(
5361+
U seed, Func2<U, ? super T, U> accumulator,
5362+
Func1<? super U, ? extends V> resultSelector) {
5363+
return create(new OperationAggregate.AggregateSelector<T, U, V>(this, seed, accumulator, resultSelector));
5364+
}
5365+
5366+
/**
5367+
* Create an Observable that aggregates the source values with the given indexed accumulator
5368+
* function and projects the final result via the indexed resultselector.
5369+
*
5370+
* @param <U> the intermediate (accumulator) type
5371+
* @param <V> the result type
5372+
* @param seed the initial value of the accumulator
5373+
* @param accumulator the function that takes the current accumulator value,
5374+
* the current emitted value and returns a (new) accumulated value.
5375+
* @param resultSelector the selector to project the final value of the accumulator, where
5376+
* the second argument is the total number of elements accumulated
5377+
* @return an Observable that aggregates the source values with the given indexed accumulator
5378+
* function and projects the final result via the indexed resultselector.
5379+
*/
5380+
public <U, V> Observable<V> aggregateIndexed(
5381+
U seed, Func3<U, ? super T, ? super Integer, U> accumulator,
5382+
Func2<? super U, ? super Integer, ? extends V> resultSelector
5383+
) {
5384+
return create(new OperationAggregate.AggregateIndexedSelector<T, U, V>(this, seed, accumulator, resultSelector));
5385+
}
52465386

52475387
/**
52485388
* Returns an Observable that applies a function of your choosing to the
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import rx.Observable;
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Observer;
22+
import rx.Subscription;
23+
import rx.util.functions.Func1;
24+
import rx.util.functions.Func2;
25+
import rx.util.functions.Func3;
26+
27+
/**
28+
* Aggregate overloads with index and selector functions.
29+
*/
30+
public final class OperationAggregate {
31+
/** Utility class. */
32+
private OperationAggregate() { throw new IllegalStateException("No instances!"); }
33+
34+
/**
35+
* Aggregate and emit a value after running it through a selector.
36+
* @param <T> the input value type
37+
* @param <U> the intermediate value type
38+
* @param <V> the result value type
39+
*/
40+
public static final class AggregateSelector<T, U, V> implements OnSubscribeFunc<V> {
41+
final Observable<? extends T> source;
42+
final U seed;
43+
final Func2<U, ? super T, U> aggregator;
44+
final Func1<? super U, ? extends V> resultSelector;
45+
46+
public AggregateSelector(
47+
Observable<? extends T> source, U seed,
48+
Func2<U, ? super T, U> aggregator,
49+
Func1<? super U, ? extends V> resultSelector) {
50+
this.source = source;
51+
this.seed = seed;
52+
this.aggregator = aggregator;
53+
this.resultSelector = resultSelector;
54+
}
55+
56+
@Override
57+
public Subscription onSubscribe(Observer<? super V> t1) {
58+
return source.subscribe(new AggregatorObserver(t1, seed));
59+
}
60+
/** The aggregator observer of the source. */
61+
private final class AggregatorObserver implements Observer<T> {
62+
final Observer<? super V> observer;
63+
U accumulator;
64+
public AggregatorObserver(Observer<? super V> observer, U seed) {
65+
this.observer = observer;
66+
this.accumulator = seed;
67+
}
68+
69+
@Override
70+
public void onNext(T args) {
71+
accumulator = aggregator.call(accumulator, args);
72+
}
73+
74+
@Override
75+
public void onError(Throwable e) {
76+
accumulator = null;
77+
observer.onError(e);
78+
}
79+
80+
@Override
81+
public void onCompleted() {
82+
U a = accumulator;
83+
accumulator = null;
84+
try {
85+
observer.onNext(resultSelector.call(a));
86+
} catch (Throwable t) {
87+
observer.onError(t);
88+
return;
89+
}
90+
observer.onCompleted();
91+
}
92+
}
93+
}
94+
/**
95+
* Indexed aggregate and emit a value after running it through an indexed selector.
96+
* @param <T> the input value type
97+
* @param <U> the intermediate value type
98+
* @param <V> the result value type
99+
*/
100+
public static final class AggregateIndexedSelector<T, U, V> implements OnSubscribeFunc<V> {
101+
final Observable<? extends T> source;
102+
final U seed;
103+
final Func3<U, ? super T, ? super Integer, U> aggregator;
104+
final Func2<? super U, ? super Integer, ? extends V> resultSelector;
105+
106+
public AggregateIndexedSelector(
107+
Observable<? extends T> source,
108+
U seed,
109+
Func3<U, ? super T, ? super Integer, U> aggregator,
110+
Func2<? super U, ? super Integer, ? extends V> resultSelector) {
111+
this.source = source;
112+
this.seed = seed;
113+
this.aggregator = aggregator;
114+
this.resultSelector = resultSelector;
115+
}
116+
117+
118+
119+
@Override
120+
public Subscription onSubscribe(Observer<? super V> t1) {
121+
return source.subscribe(new AggregatorObserver(t1, seed));
122+
}
123+
/** The aggregator observer of the source. */
124+
private final class AggregatorObserver implements Observer<T> {
125+
final Observer<? super V> observer;
126+
U accumulator;
127+
int index;
128+
public AggregatorObserver(Observer<? super V> observer, U seed) {
129+
this.observer = observer;
130+
this.accumulator = seed;
131+
}
132+
133+
@Override
134+
public void onNext(T args) {
135+
accumulator = aggregator.call(accumulator, args, index++);
136+
}
137+
138+
@Override
139+
public void onError(Throwable e) {
140+
accumulator = null;
141+
observer.onError(e);
142+
}
143+
144+
@Override
145+
public void onCompleted() {
146+
U a = accumulator;
147+
accumulator = null;
148+
try {
149+
observer.onNext(resultSelector.call(a, index));
150+
} catch (Throwable t) {
151+
observer.onError(t);
152+
return;
153+
}
154+
observer.onCompleted();
155+
}
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)