Skip to content

Commit 5ef3600

Browse files
author
jmhofer
committed
Fixed a typo, added missing error and completion handling
1 parent ce9c6b5 commit 5ef3600

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.Observer;
2727
import rx.Subscription;
2828
import rx.util.AtomicObservableSubscription;
29+
import rx.util.functions.Action0;
2930
import rx.util.functions.Action1;
3031
import rx.util.functions.Func1;
3132
import rx.util.functions.Func2;
@@ -65,12 +66,12 @@ public static <T> Func1<Observer<T>, Subscription> scan(Observable<T> sequence,
6566

6667
private static class AccuWithoutInitialValue<T> implements Func1<Observer<T>, Subscription> {
6768
private final Observable<T> sequence;
68-
private final Func2<T, T, T> accumlatorFunction;
69+
private final Func2<T, T, T> accumulatorFunction;
6970
private T initialValue;
7071

7172
private AccuWithoutInitialValue(Observable<T> sequence, Func2<T, T, T> accumulator) {
7273
this.sequence = sequence;
73-
this.accumlatorFunction = accumulator;
74+
this.accumulatorFunction = accumulator;
7475
}
7576

7677
@Override
@@ -80,22 +81,32 @@ public Subscription call(final Observer<T> observer) {
8081
public void call(T value) {
8182
initialValue = value;
8283
}
84+
}, new Action1<Exception>() {
85+
@Override
86+
public void call(Exception e) {
87+
observer.onError(e);
88+
}
89+
}, new Action0() {
90+
@Override
91+
public void call() {
92+
observer.onCompleted();
93+
}
8394
});
84-
Accumulator<T, T> scan = new Accumulator<T, T>(sequence.skip(1), initialValue, accumlatorFunction);
95+
Accumulator<T, T> scan = new Accumulator<T, T>(sequence.skip(1), initialValue, accumulatorFunction);
8596
return scan.call(observer);
8697
}
8798
}
8899

89100
private static class Accumulator<T, R> implements Func1<Observer<R>, Subscription> {
90101
private final Observable<T> sequence;
91102
private final R initialValue;
92-
private final Func2<R, T, R> accumlatorFunction;
103+
private final Func2<R, T, R> accumulatorFunction;
93104
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
94105

95106
private Accumulator(Observable<T> sequence, R initialValue, Func2<R, T, R> accumulator) {
96107
this.sequence = sequence;
97108
this.initialValue = initialValue;
98-
this.accumlatorFunction = accumulator;
109+
this.accumulatorFunction = accumulator;
99110
}
100111

101112
@Override
@@ -115,7 +126,7 @@ public Subscription call(final Observer<R> observer) {
115126
@Override
116127
public synchronized void onNext(T value) {
117128
try {
118-
acc = accumlatorFunction.call(acc, value);
129+
acc = accumulatorFunction.call(acc, value);
119130
observer.onNext(acc);
120131
} catch (Exception ex) {
121132
observer.onError(ex);

0 commit comments

Comments
 (0)