1515 */
1616package rx .operators ;
1717
18- import rx .Observable ;
19- import rx .Observable .OnSubscribeFunc ;
20- import rx .Observer ;
21- import rx .Subscription ;
18+ import rx .Observable .Operator ;
19+ import rx .Subscriber ;
2220import rx .util .functions .Func2 ;
2321
2422/**
3634 */
3735public final class OperationScan {
3836 /**
39- * Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator.
37+ * Applies an accumulator function over an observable sequence and returns each intermediate
38+ * result with the specified source and accumulator.
4039 *
4140 * @param sequence
4241 * An observable sequence of elements to project.
@@ -45,124 +44,97 @@ public final class OperationScan {
4544 * @param accumulator
4645 * An accumulator function to be invoked on each element from the sequence.
4746 *
48- * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
49- * @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
47+ * @return An observable sequence whose elements are the result of accumulating the output from
48+ * the list of Observables.
49+ * @see <a
50+ * href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource,
51+ * TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
5052 * TAccumulate))</a>
5153 */
52- public static <T , R > OnSubscribeFunc <R > scan (Observable <? extends T > sequence , R initialValue , Func2 <R , ? super T , R > accumulator ) {
53- return new Accumulator <T , R >(sequence , initialValue , accumulator );
54+ public static <T , R > Operator <R , T > scan (final R initialValue , final Func2 <R , ? super T , R > accumulator ) {
55+ return new Operator <R , T >() {
56+ @ Override
57+ public Subscriber <T > call (final Subscriber <? super R > observer ) {
58+ observer .onNext (initialValue );
59+ return new Subscriber <T >(observer ) {
60+ private R value = initialValue ;
61+
62+ @ Override
63+ public void onNext (T value ) {
64+ try {
65+ this .value = accumulator .call (this .value , value );
66+ } catch (Throwable e ) {
67+ observer .onError (e );
68+ observer .unsubscribe ();
69+ }
70+ observer .onNext (this .value );
71+ }
72+
73+ @ Override
74+ public void onError (Throwable e ) {
75+ observer .onError (e );
76+ }
77+
78+ @ Override
79+ public void onCompleted () {
80+ observer .onCompleted ();
81+ }
82+ };
83+ }
84+ };
5485 }
5586
5687 /**
57- * Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator.
88+ * Applies an accumulator function over an observable sequence and returns each intermediate
89+ * result with the specified source and accumulator.
5890 *
5991 * @param sequence
6092 * An observable sequence of elements to project.
6193 * @param accumulator
6294 * An accumulator function to be invoked on each element from the sequence.
6395 *
64- * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
65- * @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
96+ * @return An observable sequence whose elements are the result of accumulating the output from
97+ * the list of Observables.
98+ * @see <a
99+ * href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource)
100+ * Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
66101 */
67- public static <T > OnSubscribeFunc <T > scan (Observable <? extends T > sequence , Func2 <T , T , T > accumulator ) {
68- return new AccuWithoutInitialValue <T >(sequence , accumulator );
69- }
70-
71- private static class AccuWithoutInitialValue <T > implements OnSubscribeFunc <T > {
72- private final Observable <? extends T > sequence ;
73- private final Func2 <T , T , T > accumulatorFunction ;
74-
75- private AccumulatingObserver <T , T > accumulatingObserver ;
76-
77- private AccuWithoutInitialValue (Observable <? extends T > sequence , Func2 <T , T , T > accumulator ) {
78- this .sequence = sequence ;
79- this .accumulatorFunction = accumulator ;
80- }
81-
82- @ Override
83- public Subscription onSubscribe (final Observer <? super T > observer ) {
84- return sequence .subscribe (new Observer <T >() {
85-
86- // has to be synchronized so that the initial value is always sent only once.
87- @ Override
88- public synchronized void onNext (T value ) {
89- if (accumulatingObserver == null ) {
90- observer .onNext (value );
91- accumulatingObserver = new AccumulatingObserver <T , T >(observer , value , accumulatorFunction );
92- } else {
93- accumulatingObserver .onNext (value );
102+ public static <T > Operator <T , T > scan (final Func2 <T , T , T > accumulator ) {
103+ return new Operator <T , T >() {
104+ @ Override
105+ public Subscriber <T > call (final Subscriber <? super T > observer ) {
106+ return new Subscriber <T >(observer ) {
107+ private boolean first = true ;
108+ private T value ;
109+
110+ @ Override
111+ public void onNext (T value ) {
112+ if (first ) {
113+ this .value = value ;
114+ first = false ;
115+ }
116+ else {
117+ try {
118+ this .value = accumulator .call (this .value , value );
119+ } catch (Throwable e ) {
120+ observer .onError (e );
121+ observer .unsubscribe ();
122+ }
123+ }
124+ observer .onNext (this .value );
94125 }
95- }
96-
97- @ Override
98- public void onError (Throwable e ) {
99- observer .onError (e );
100- }
101126
102- @ Override
103- public void onCompleted () {
104- observer .onCompleted ();
105- }
106- });
107- }
108- }
109-
110- private static class Accumulator <T , R > implements OnSubscribeFunc <R > {
111- private final Observable <? extends T > sequence ;
112- private final R initialValue ;
113- private final Func2 <R , ? super T , R > accumulatorFunction ;
114-
115- private Accumulator (Observable <? extends T > sequence , R initialValue , Func2 <R , ? super T , R > accumulator ) {
116- this .sequence = sequence ;
117- this .initialValue = initialValue ;
118- this .accumulatorFunction = accumulator ;
119- }
120-
121- @ Override
122- public Subscription onSubscribe (final Observer <? super R > observer ) {
123- observer .onNext (initialValue );
124- return sequence .subscribe (new AccumulatingObserver <T , R >(observer , initialValue , accumulatorFunction ));
125- }
126- }
127-
128- private static class AccumulatingObserver <T , R > implements Observer <T > {
129- private final Observer <? super R > observer ;
130- private final Func2 <R , ? super T , R > accumulatorFunction ;
131-
132- private R acc ;
133-
134- private AccumulatingObserver (Observer <? super R > observer , R initialValue , Func2 <R , ? super T , R > accumulator ) {
135- this .observer = observer ;
136- this .accumulatorFunction = accumulator ;
137-
138- this .acc = initialValue ;
139- }
127+ @ Override
128+ public void onError (Throwable e ) {
129+ observer .onError (e );
130+ }
140131
141- /**
142- * We must synchronize this because we can't allow
143- * multiple threads to execute the 'accumulatorFunction' at the same time because
144- * the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap
145- *
146- * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded
147- */
148- @ Override
149- public synchronized void onNext (T value ) {
150- try {
151- acc = accumulatorFunction .call (acc , value );
152- observer .onNext (acc );
153- } catch (Throwable ex ) {
154- observer .onError (ex );
132+ @ Override
133+ public void onCompleted () {
134+ observer .onCompleted ();
135+ }
136+ };
155137 }
156- }
157-
158- @ Override
159- public void onError (Throwable e ) {
160- observer .onError (e );
161- }
162-
163- @ Override
164- public void onCompleted () {
165- observer .onCompleted ();
166- }
138+ };
167139 }
168140}
0 commit comments