Skip to content

Commit 13d293f

Browse files
Update OperationScan to OperatorScan
Migrate to use updated conventions of Operator* classes that implement rx.Observable.Operator
1 parent 6c578f7 commit 13d293f

File tree

5 files changed

+130
-153
lines changed

5 files changed

+130
-153
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
import rx.operators.OperationReplay;
7373
import rx.operators.OperationRetry;
7474
import rx.operators.OperationSample;
75-
import rx.operators.OperationScan;
75+
import rx.operators.OperatorScan;
7676
import rx.operators.OperationSequenceEqual;
7777
import rx.operators.OperationSingle;
7878
import rx.operators.OperationSkip;
@@ -6140,7 +6140,7 @@ public final <U> Observable<T> sample(Observable<U> sampler) {
61406140
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
61416141
*/
61426142
public final Observable<T> scan(Func2<T, T, T> accumulator) {
6143-
return lift(OperationScan.scan(accumulator));
6143+
return lift(new OperatorScan<T, T>(accumulator));
61446144
}
61456145

61466146
/**
@@ -6167,7 +6167,7 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
61676167
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
61686168
*/
61696169
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
6170-
return lift(OperationScan.scan(initialValue, accumulator));
6170+
return lift(new OperatorScan<R, T>(initialValue, accumulator));
61716171
}
61726172

61736173
/**

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 0 additions & 140 deletions
This file was deleted.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.util.functions.Func2;
21+
22+
/**
23+
* Returns an Observable that applies a function to the first item emitted by a source Observable,
24+
* then feeds the result of that function along with the second item emitted by an Observable into
25+
* the same function, and so on until all items have been emitted by the source Observable,
26+
* emitting the result of each of these iterations.
27+
* <p>
28+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/scan.png">
29+
* <p>
30+
* This sort of function is sometimes called an accumulator.
31+
* <p>
32+
* Note that when you pass a seed to <code>scan()</code> the resulting Observable will emit that
33+
* seed as its first emitted item.
34+
*/
35+
public final class OperatorScan<R, T> implements Operator<R, T> {
36+
37+
private final R initialValue;
38+
private final Func2<R, ? super T, R> accumulator;
39+
// sentinel if we don't receive an initial value
40+
private static final Object NO_INITIAL_VALUE = new Object();
41+
42+
/**
43+
* Applies an accumulator function over an observable sequence and returns each intermediate
44+
* result with the specified source and accumulator.
45+
*
46+
* @param sequence
47+
* An observable sequence of elements to project.
48+
* @param initialValue
49+
* The initial (seed) accumulator value.
50+
* @param accumulator
51+
* An accumulator function to be invoked on each element from the sequence.
52+
*
53+
* @return An observable sequence whose elements are the result of accumulating the output from
54+
* the list of Observables.
55+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
56+
* TAccumulate))</a>
57+
*/
58+
public OperatorScan(R initialValue, Func2<R, ? super T, R> accumulator) {
59+
this.initialValue = initialValue;
60+
this.accumulator = accumulator;
61+
}
62+
63+
/**
64+
* Applies an accumulator function over an observable sequence and returns each intermediate
65+
* result with the specified source and accumulator.
66+
*
67+
* @param sequence
68+
* An observable sequence of elements to project.
69+
* @param accumulator
70+
* An accumulator function to be invoked on each element from the sequence.
71+
*
72+
* @return An observable sequence whose elements are the result of accumulating the output from
73+
* the list of Observables.
74+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
75+
*/
76+
@SuppressWarnings("unchecked")
77+
public OperatorScan(final Func2<R, ? super T, R> accumulator) {
78+
this((R) NO_INITIAL_VALUE, accumulator);
79+
}
80+
81+
@Override
82+
public Subscriber<? super T> call(final Subscriber<? super R> observer) {
83+
if (initialValue != NO_INITIAL_VALUE) {
84+
observer.onNext(initialValue);
85+
}
86+
return new Subscriber<T>(observer) {
87+
private R value = initialValue;
88+
89+
@SuppressWarnings("unchecked")
90+
@Override
91+
public void onNext(T value) {
92+
if (this.value == NO_INITIAL_VALUE) {
93+
// if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R
94+
this.value = (R) value;
95+
} else {
96+
try {
97+
this.value = accumulator.call(this.value, value);
98+
} catch (Throwable e) {
99+
observer.onError(e);
100+
observer.unsubscribe();
101+
}
102+
}
103+
observer.onNext(this.value);
104+
}
105+
106+
@Override
107+
public void onError(Throwable e) {
108+
observer.onError(e);
109+
}
110+
111+
@Override
112+
public void onCompleted() {
113+
observer.onCompleted();
114+
}
115+
};
116+
}
117+
}

rxjava-core/src/test/java/rx/ReduceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import rx.CovarianceTest.HorrorMovie;
2323
import rx.CovarianceTest.Movie;
24-
import rx.operators.OperationScan;
24+
import rx.operators.OperatorScan;
2525
import rx.util.functions.Func2;
2626

2727
public class ReduceTests {
@@ -52,7 +52,7 @@ public Movie call(Movie t1, Movie t2) {
5252
}
5353
};
5454

55-
Observable<Movie> reduceResult = horrorMovies.lift(OperationScan.scan(chooseSecondMovie)).takeLast(1);
55+
Observable<Movie> reduceResult = horrorMovies.scan(chooseSecondMovie).takeLast(1);
5656

5757
Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
5858
}

rxjava-core/src/test/java/rx/operators/OperationScanTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorScanTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import static org.mockito.Matchers.*;
1919
import static org.mockito.Mockito.*;
20-
import static rx.operators.OperationScan.*;
20+
import static rx.operators.OperatorScan.*;
2121

2222
import org.junit.Before;
2323
import org.junit.Test;
@@ -27,7 +27,7 @@
2727
import rx.Observer;
2828
import rx.util.functions.Func2;
2929

30-
public class OperationScanTest {
30+
public class OperatorScanTest {
3131

3232
@Before
3333
public void before() {
@@ -41,14 +41,14 @@ public void testScanIntegersWithInitialValue() {
4141

4242
Observable<Integer> observable = Observable.from(1, 2, 3);
4343

44-
Observable<String> m = observable.lift(scan("", new Func2<String, Integer, String>() {
44+
Observable<String> m = observable.scan("", new Func2<String, Integer, String>() {
4545

4646
@Override
4747
public String call(String s, Integer n) {
4848
return s + n.toString();
4949
}
5050

51-
}));
51+
});
5252
m.subscribe(observer);
5353

5454
verify(observer, never()).onError(any(Throwable.class));
@@ -68,14 +68,14 @@ public void testScanIntegersWithoutInitialValue() {
6868

6969
Observable<Integer> observable = Observable.from(1, 2, 3);
7070

71-
Observable<Integer> m = observable.lift(scan(new Func2<Integer, Integer, Integer>() {
71+
Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {
7272

7373
@Override
7474
public Integer call(Integer t1, Integer t2) {
7575
return t1 + t2;
7676
}
7777

78-
}));
78+
});
7979
m.subscribe(observer);
8080

8181
verify(observer, never()).onError(any(Throwable.class));
@@ -95,14 +95,14 @@ public void testScanIntegersWithoutInitialValueAndOnlyOneValue() {
9595

9696
Observable<Integer> observable = Observable.from(1);
9797

98-
Observable<Integer> m = observable.lift(scan(new Func2<Integer, Integer, Integer>() {
98+
Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {
9999

100100
@Override
101101
public Integer call(Integer t1, Integer t2) {
102102
return t1 + t2;
103103
}
104104

105-
}));
105+
});
106106
m.subscribe(observer);
107107

108108
verify(observer, never()).onError(any(Throwable.class));

0 commit comments

Comments
 (0)