Skip to content

Commit 78539f0

Browse files
Scan/Reduce with Seed Factory
Adds overload with seed factory as per #1831
1 parent faa270c commit 78539f0

File tree

3 files changed

+114
-4
lines changed

3 files changed

+114
-4
lines changed

src/main/java/rx/Observable.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5429,6 +5429,41 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
54295429
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
54305430
return scan(initialValue, accumulator).takeLast(1);
54315431
}
5432+
5433+
/**
5434+
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
5435+
* Observable and a specified seed value, then feeds the result of that function along with the second item
5436+
* emitted by an Observable into the same function, and so on until all items have been emitted by the
5437+
* source Observable, emitting the final result from the final call to your function as its sole item.
5438+
* <p>
5439+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
5440+
* <p>
5441+
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
5442+
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
5443+
* that does a similar operation on lists.
5444+
* <dl>
5445+
* <dt><b>Backpressure Support:</b></dt>
5446+
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
5447+
* them to a single {@code onNext}.</dd>
5448+
* <dt><b>Scheduler:</b></dt>
5449+
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
5450+
* </dl>
5451+
*
5452+
* @param initialValueFactory
5453+
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
5454+
* @param accumulator
5455+
* an accumulator function to be invoked on each item emitted by the source Observable, the
5456+
* result of which will be used in the next accumulator call
5457+
* @return an Observable that emits a single item that is the result of accumulating the output from the
5458+
* items emitted by the source Observable
5459+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
5460+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154.aspx">MSDN: Observable.Aggregate</a>
5461+
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
5462+
*/
5463+
public final <R> Observable<R> reduce(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
5464+
return scan(initialValueFactory, accumulator).takeLast(1);
5465+
}
5466+
54325467

54335468
/**
54345469
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
@@ -6491,6 +6526,38 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
64916526
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
64926527
return lift(new OperatorScan<R, T>(initialValue, accumulator));
64936528
}
6529+
6530+
/**
6531+
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
6532+
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
6533+
* the source Observable into the same function, and so on until all items have been emitted by the source
6534+
* Observable, emitting the result of each of these iterations.
6535+
* <p>
6536+
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
6537+
* <p>
6538+
* This sort of function is sometimes called an accumulator.
6539+
* <p>
6540+
* Note that the Observable that results from this method will emit {@code initialValue} as its first
6541+
* emitted item.
6542+
* <dl>
6543+
* <dt><b>Scheduler:</b></dt>
6544+
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
6545+
* </dl>
6546+
*
6547+
* @param initialValueFactory
6548+
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
6549+
* @param accumulator
6550+
* an accumulator function to be invoked on each item emitted by the source Observable, whose
6551+
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
6552+
* next accumulator call
6553+
* @return an Observable that emits {@code initialValue} followed by the results of each call to the
6554+
* accumulator function
6555+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
6556+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
6557+
*/
6558+
public final <R> Observable<R> scan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
6559+
return lift(new OperatorScan<R, T>(initialValueFactory, accumulator));
6560+
}
64946561

64956562
/**
64966563
* Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract

src/main/java/rx/internal/operators/OperatorScan.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import rx.Producer;
2222
import rx.Subscriber;
2323
import rx.exceptions.OnErrorThrowable;
24+
import rx.functions.Func0;
2425
import rx.functions.Func2;
26+
import rx.internal.util.UtilityFunctions;
2527

2628
/**
2729
* Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds
@@ -38,7 +40,7 @@
3840
*/
3941
public final class OperatorScan<R, T> implements Operator<R, T> {
4042

41-
private final R initialValue;
43+
private final Func0<R> initialValueFactory;
4244
private final Func2<R, ? super T, R> accumulator;
4345
// sentinel if we don't receive an initial value
4446
private static final Object NO_INITIAL_VALUE = new Object();
@@ -54,8 +56,19 @@ public final class OperatorScan<R, T> implements Operator<R, T> {
5456
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
5557
* TAccumulate))</a>
5658
*/
57-
public OperatorScan(R initialValue, Func2<R, ? super T, R> accumulator) {
58-
this.initialValue = initialValue;
59+
public OperatorScan(final R initialValue, Func2<R, ? super T, R> accumulator) {
60+
this(new Func0<R>() {
61+
62+
@Override
63+
public R call() {
64+
return initialValue;
65+
}
66+
67+
}, accumulator);
68+
}
69+
70+
public OperatorScan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
71+
this.initialValueFactory = initialValueFactory;
5972
this.accumulator = accumulator;
6073
}
6174

@@ -75,6 +88,7 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {
7588
@Override
7689
public Subscriber<? super T> call(final Subscriber<? super R> child) {
7790
return new Subscriber<T>(child) {
91+
private final R initialValue = initialValueFactory.call();
7892
private R value = initialValue;
7993
boolean initialized = false;
8094

src/test/java/rx/internal/operators/OperatorScanTest.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Matchers.anyInt;
2122
import static org.mockito.Matchers.anyString;
@@ -24,6 +25,9 @@
2425
import static org.mockito.Mockito.times;
2526
import static org.mockito.Mockito.verify;
2627

28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.List;
2731
import java.util.concurrent.atomic.AtomicInteger;
2832

2933
import org.junit.Before;
@@ -33,6 +37,7 @@
3337
import rx.Observable;
3438
import rx.Observer;
3539
import rx.Subscriber;
40+
import rx.functions.Func0;
3641
import rx.functions.Func1;
3742
import rx.functions.Func2;
3843
import rx.observers.TestSubscriber;
@@ -263,4 +268,28 @@ public void onNext(Integer t) {
263268
// we only expect to receive 101 as we'll receive all 100 + the initial value
264269
assertEquals(101, count.get());
265270
}
271+
272+
@Test
273+
public void testSeedFactory() {
274+
Observable<List<Integer>> o = Observable.range(1, 10)
275+
.scan(new Func0<List<Integer>>() {
276+
277+
@Override
278+
public List<Integer> call() {
279+
return new ArrayList<Integer>();
280+
}
281+
282+
}, new Func2<List<Integer>, Integer, List<Integer>>() {
283+
284+
@Override
285+
public List<Integer> call(List<Integer> list, Integer t2) {
286+
list.add(t2);
287+
return list;
288+
}
289+
290+
}).takeLast(1);
291+
292+
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
293+
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
294+
}
266295
}

0 commit comments

Comments
 (0)