Skip to content

Commit 6737419

Browse files
committed
Convert to scan to use lift
1 parent 4bb59a5 commit 6737419

File tree

6 files changed

+132
-385
lines changed

6 files changed

+132
-385
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2529,14 +2529,6 @@ public final static <T> Observable<Boolean> sequenceEqual(Observable<? extends T
25292529
return OperationSequenceEqual.sequenceEqual(first, second, equality);
25302530
}
25312531

2532-
/**
2533-
* @deprecated use {@link #sumInteger}
2534-
*/
2535-
@Deprecated
2536-
public final static Observable<Integer> sum(Observable<Integer> source) {
2537-
return OperationSum.sum(source);
2538-
}
2539-
25402532
/**
25412533
* Returns an Observable that emits the sum of all the Doubles emitted by the source Observable.
25422534
* <p>
@@ -2583,7 +2575,7 @@ public final static Observable<Float> sumFloat(Observable<Float> source) {
25832575
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
25842576
*/
25852577
public final static Observable<Integer> sumInteger(Observable<Integer> source) {
2586-
return OperationSum.sum(source);
2578+
return OperationSum.sumIntegers(source);
25872579
}
25882580

25892581
/**
@@ -5519,7 +5511,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
55195511
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is
55205512
* empty.
55215513
*/
5522-
return create(OperationScan.scan(this, accumulator)).last();
5514+
return scan(accumulator).last();
55235515
}
55245516

55255517
/**
@@ -5547,7 +5539,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
55475539
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
55485540
*/
55495541
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
5550-
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
5542+
return scan(initialValue, accumulator).takeLast(1);
55515543
}
55525544

55535545
/**
@@ -6148,7 +6140,7 @@ public final <U> Observable<T> sample(Observable<U> sampler) {
61486140
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
61496141
*/
61506142
public final Observable<T> scan(Func2<T, T, T> accumulator) {
6151-
return create(OperationScan.scan(this, accumulator));
6143+
return lift(OperationScan.scan(accumulator));
61526144
}
61536145

61546146
/**
@@ -6175,7 +6167,7 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
61756167
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
61766168
*/
61776169
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
6178-
return create(OperationScan.scan(this, initialValue, accumulator));
6170+
return lift(OperationScan.scan(initialValue, accumulator));
61796171
}
61806172

61816173
/**
@@ -7079,7 +7071,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
70797071
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
70807072
*/
70817073
public final Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
7082-
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
7074+
return OperationSum.sumAtLeastOneDoubles(map(valueExtractor));
70837075
}
70847076

70857077
/**
@@ -7096,7 +7088,7 @@ public final Observable<Double> sumDouble(Func1<? super T, Double> valueExtracto
70967088
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
70977089
*/
70987090
public final Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
7099-
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
7091+
return OperationSum.sumAtLeastOneFloats(map(valueExtractor));
71007092
}
71017093

71027094
/**
@@ -7113,7 +7105,7 @@ public final Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor)
71137105
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
71147106
*/
71157107
public final Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
7116-
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
7108+
return OperationSum.sumAtLeastOneIntegers(map(valueExtractor));
71177109
}
71187110

71197111
/**
@@ -7130,7 +7122,7 @@ public final Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtra
71307122
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
71317123
*/
71327124
public final Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
7133-
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
7125+
return OperationSum.sumAtLeastOneLongs(map(valueExtractor));
71347126
}
71357127

71367128
/**

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 80 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package rx.operators;
1717

18-
import rx.Observable;
19-
import rx.Observable.OnSubscribeFunc;
20-
import rx.Observer;
21-
import rx.Subscription;
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
2220
import rx.util.functions.Func2;
2321

2422
/**
@@ -36,7 +34,8 @@
3634
*/
3735
public final class OperationScan {
3836
/**
39-
* Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator.
37+
* Applies an accumulator function over an observable sequence and returns each intermediate
38+
* result with the specified source and accumulator.
4039
*
4140
* @param sequence
4241
* An observable sequence of elements to project.
@@ -45,124 +44,97 @@ public final class OperationScan {
4544
* @param accumulator
4645
* An accumulator function to be invoked on each element from the sequence.
4746
*
48-
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
49-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
47+
* @return An observable sequence whose elements are the result of accumulating the output from
48+
* the list of Observables.
49+
* @see <a
50+
* href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource,
51+
* TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
5052
* TAccumulate))</a>
5153
*/
52-
public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
53-
return new Accumulator<T, R>(sequence, initialValue, accumulator);
54+
public static <T, R> Operator<R, T> scan(final R initialValue, final Func2<R, ? super T, R> accumulator) {
55+
return new Operator<R, T>() {
56+
@Override
57+
public Subscriber<T> call(final Subscriber<? super R> observer) {
58+
observer.onNext(initialValue);
59+
return new Subscriber<T>(observer) {
60+
private R value = initialValue;
61+
62+
@Override
63+
public void onNext(T value) {
64+
try {
65+
this.value = accumulator.call(this.value, value);
66+
} catch (Throwable e) {
67+
observer.onError(e);
68+
observer.unsubscribe();
69+
}
70+
observer.onNext(this.value);
71+
}
72+
73+
@Override
74+
public void onError(Throwable e) {
75+
observer.onError(e);
76+
}
77+
78+
@Override
79+
public void onCompleted() {
80+
observer.onCompleted();
81+
}
82+
};
83+
}
84+
};
5485
}
5586

5687
/**
57-
* Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator.
88+
* Applies an accumulator function over an observable sequence and returns each intermediate
89+
* result with the specified source and accumulator.
5890
*
5991
* @param sequence
6092
* An observable sequence of elements to project.
6193
* @param accumulator
6294
* An accumulator function to be invoked on each element from the sequence.
6395
*
64-
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
65-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
96+
* @return An observable sequence whose elements are the result of accumulating the output from
97+
* the list of Observables.
98+
* @see <a
99+
* href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource)
100+
* Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
66101
*/
67-
public static <T> OnSubscribeFunc<T> scan(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
68-
return new AccuWithoutInitialValue<T>(sequence, accumulator);
69-
}
70-
71-
private static class AccuWithoutInitialValue<T> implements OnSubscribeFunc<T> {
72-
private final Observable<? extends T> sequence;
73-
private final Func2<T, T, T> accumulatorFunction;
74-
75-
private AccumulatingObserver<T, T> accumulatingObserver;
76-
77-
private AccuWithoutInitialValue(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
78-
this.sequence = sequence;
79-
this.accumulatorFunction = accumulator;
80-
}
81-
82-
@Override
83-
public Subscription onSubscribe(final Observer<? super T> observer) {
84-
return sequence.subscribe(new Observer<T>() {
85-
86-
// has to be synchronized so that the initial value is always sent only once.
87-
@Override
88-
public synchronized void onNext(T value) {
89-
if (accumulatingObserver == null) {
90-
observer.onNext(value);
91-
accumulatingObserver = new AccumulatingObserver<T, T>(observer, value, accumulatorFunction);
92-
} else {
93-
accumulatingObserver.onNext(value);
102+
public static <T> Operator<T, T> scan(final Func2<T, T, T> accumulator) {
103+
return new Operator<T, T>() {
104+
@Override
105+
public Subscriber<T> call(final Subscriber<? super T> observer) {
106+
return new Subscriber<T>(observer) {
107+
private boolean first = true;
108+
private T value;
109+
110+
@Override
111+
public void onNext(T value) {
112+
if (first) {
113+
this.value = value;
114+
first = false;
115+
}
116+
else {
117+
try {
118+
this.value = accumulator.call(this.value, value);
119+
} catch (Throwable e) {
120+
observer.onError(e);
121+
observer.unsubscribe();
122+
}
123+
}
124+
observer.onNext(this.value);
94125
}
95-
}
96-
97-
@Override
98-
public void onError(Throwable e) {
99-
observer.onError(e);
100-
}
101126

102-
@Override
103-
public void onCompleted() {
104-
observer.onCompleted();
105-
}
106-
});
107-
}
108-
}
109-
110-
private static class Accumulator<T, R> implements OnSubscribeFunc<R> {
111-
private final Observable<? extends T> sequence;
112-
private final R initialValue;
113-
private final Func2<R, ? super T, R> accumulatorFunction;
114-
115-
private Accumulator(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
116-
this.sequence = sequence;
117-
this.initialValue = initialValue;
118-
this.accumulatorFunction = accumulator;
119-
}
120-
121-
@Override
122-
public Subscription onSubscribe(final Observer<? super R> observer) {
123-
observer.onNext(initialValue);
124-
return sequence.subscribe(new AccumulatingObserver<T, R>(observer, initialValue, accumulatorFunction));
125-
}
126-
}
127-
128-
private static class AccumulatingObserver<T, R> implements Observer<T> {
129-
private final Observer<? super R> observer;
130-
private final Func2<R, ? super T, R> accumulatorFunction;
131-
132-
private R acc;
133-
134-
private AccumulatingObserver(Observer<? super R> observer, R initialValue, Func2<R, ? super T, R> accumulator) {
135-
this.observer = observer;
136-
this.accumulatorFunction = accumulator;
137-
138-
this.acc = initialValue;
139-
}
127+
@Override
128+
public void onError(Throwable e) {
129+
observer.onError(e);
130+
}
140131

141-
/**
142-
* We must synchronize this because we can't allow
143-
* multiple threads to execute the 'accumulatorFunction' at the same time because
144-
* the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap
145-
*
146-
* Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded
147-
*/
148-
@Override
149-
public synchronized void onNext(T value) {
150-
try {
151-
acc = accumulatorFunction.call(acc, value);
152-
observer.onNext(acc);
153-
} catch (Throwable ex) {
154-
observer.onError(ex);
132+
@Override
133+
public void onCompleted() {
134+
observer.onCompleted();
135+
}
136+
};
155137
}
156-
}
157-
158-
@Override
159-
public void onError(Throwable e) {
160-
observer.onError(e);
161-
}
162-
163-
@Override
164-
public void onCompleted() {
165-
observer.onCompleted();
166-
}
138+
};
167139
}
168140
}

0 commit comments

Comments
 (0)