Skip to content

Commit 4746752

Browse files
author
jmhofer
committed
trying to generalize scan, however drop is still missing...
1 parent b9466bb commit 4746752

File tree

1 file changed

+39
-33
lines changed

1 file changed

+39
-33
lines changed

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.Observer;
2727
import rx.Subscription;
2828
import rx.util.AtomicObservableSubscription;
29+
import rx.util.functions.Action1;
2930
import rx.util.functions.Func1;
3031
import rx.util.functions.Func2;
3132

@@ -43,8 +44,8 @@ public final class OperationScan {
4344
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
4445
* @see http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx
4546
*/
46-
public static <T> Func1<Observer<T>, Subscription> scan(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
47-
return new Accumulator<T>(sequence, initialValue, accumulator);
47+
public static <T, R> Func1<Observer<R>, Subscription> scan(Observable<T> sequence, R initialValue, Func2<R, T, R> accumulator) {
48+
return new Accumulator<T, R>(sequence, initialValue, accumulator);
4849
}
4950

5051
/**
@@ -59,26 +60,49 @@ public static <T> Func1<Observer<T>, Subscription> scan(Observable<T> sequence,
5960
* @see http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx
6061
*/
6162
public static <T> Func1<Observer<T>, Subscription> scan(Observable<T> sequence, Func2<T, T, T> accumulator) {
62-
return new Accumulator<T>(sequence, null, accumulator);
63+
return new AccuWithoutInitialValue<T>(sequence, accumulator);
6364
}
6465

65-
private static class Accumulator<T> implements Func1<Observer<T>, Subscription> {
66+
private static class AccuWithoutInitialValue<T> implements Func1<Observer<T>, Subscription> {
6667
private final Observable<T> sequence;
67-
private final T initialValue;
68-
private Func2<T, T, T> accumlatorFunction;
68+
private final Func2<T, T, T> accumlatorFunction;
69+
private T initialValue;
70+
71+
private AccuWithoutInitialValue(Observable<T> sequence, Func2<T, T, T> accumulator) {
72+
this.sequence = sequence;
73+
this.accumlatorFunction = accumulator;
74+
}
75+
76+
@Override
77+
public Subscription call(final Observer<T> observer) {
78+
sequence.take(1).subscribe(new Action1<T>() {
79+
@Override
80+
public void call(T value) {
81+
initialValue = value;
82+
observer.onNext(value);
83+
}
84+
});
85+
Accumulator<T, T> scan = new Accumulator<T, T>(sequence /* FIXME .drop(1) */, initialValue, accumlatorFunction);
86+
return scan.call(observer);
87+
}
88+
}
89+
90+
private static class Accumulator<T, R> implements Func1<Observer<R>, Subscription> {
91+
private final Observable<T> sequence;
92+
private final R initialValue;
93+
private final Func2<R, T, R> accumlatorFunction;
6994
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
7095

71-
private Accumulator(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
96+
private Accumulator(Observable<T> sequence, R initialValue, Func2<R, T, R> accumulator) {
7297
this.sequence = sequence;
7398
this.initialValue = initialValue;
7499
this.accumlatorFunction = accumulator;
75100
}
76101

77-
public Subscription call(final Observer<T> observer) {
78-
102+
@Override
103+
public Subscription call(final Observer<R> observer) {
79104
return subscription.wrap(sequence.subscribe(new Observer<T>() {
80-
private T acc = initialValue;
81-
private boolean hasSentInitialValue = false;
105+
private R acc = initialValue;
82106

83107
/**
84108
* We must synchronize this because we can't allow
@@ -87,26 +111,11 @@ public Subscription call(final Observer<T> observer) {
87111
*
88112
* Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded
89113
*/
114+
@Override
90115
public synchronized void onNext(T value) {
91-
if (acc == null) {
92-
// we assume that acc is not allowed to be returned from accumulatorValue
93-
// so it's okay to check null as being the state we initialize on
94-
acc = value;
95-
// this is all we do for this first value if we didn't have an initialValue
96-
return;
97-
}
98-
if (!hasSentInitialValue) {
99-
hasSentInitialValue = true;
100-
observer.onNext(acc);
101-
}
102-
103116
try {
104117

105118
acc = accumlatorFunction.call(acc, value);
106-
if (acc == null) {
107-
onError(new IllegalArgumentException("Null is an unsupported return value for an accumulator."));
108-
return;
109-
}
110119
observer.onNext(acc);
111120
} catch (Exception ex) {
112121
observer.onError(ex);
@@ -115,16 +124,13 @@ public synchronized void onNext(T value) {
115124
}
116125
}
117126

127+
@Override
118128
public void onError(Exception ex) {
119129
observer.onError(ex);
120130
}
121131

122-
// synchronized because we access 'hasSentInitialValue'
123-
public synchronized void onCompleted() {
124-
// if only one sequence value existed, we send it without any accumulation
125-
if (!hasSentInitialValue) {
126-
observer.onNext(acc);
127-
}
132+
@Override
133+
public void onCompleted() {
128134
observer.onCompleted();
129135
}
130136
}));

0 commit comments

Comments
 (0)