Skip to content

Commit 12c73c7

Browse files
author
jmhofer
committed
found drop (it's called skip here...) - finished with generalizing scan
1 parent 4746752 commit 12c73c7

File tree

2 files changed

+58
-25
lines changed

2 files changed

+58
-25
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,15 @@ public Subscription subscribe(final Map<String, Object> callbacks) {
252252
*/
253253
return protectivelyWrapAndSubscribe(new Observer() {
254254

255+
@Override
255256
public void onCompleted() {
256257
Object onComplete = callbacks.get("onCompleted");
257258
if (onComplete != null) {
258259
Functions.from(onComplete).call();
259260
}
260261
}
261262

263+
@Override
262264
public void onError(Exception e) {
263265
handleError(e);
264266
Object onError = callbacks.get("onError");
@@ -267,6 +269,7 @@ public void onError(Exception e) {
267269
}
268270
}
269271

272+
@Override
270273
public void onNext(Object args) {
271274
onNext.call(args);
272275
}
@@ -298,15 +301,18 @@ public Subscription subscribe(final Object o) {
298301
*/
299302
return protectivelyWrapAndSubscribe(new Observer() {
300303

304+
@Override
301305
public void onCompleted() {
302306
// do nothing
303307
}
304308

309+
@Override
305310
public void onError(Exception e) {
306311
handleError(e);
307312
// no callback defined
308313
}
309314

315+
@Override
310316
public void onNext(Object args) {
311317
onNext.call(args);
312318
}
@@ -327,15 +333,18 @@ public Subscription subscribe(final Action1<T> onNext) {
327333
*/
328334
return protectivelyWrapAndSubscribe(new Observer<T>() {
329335

336+
@Override
330337
public void onCompleted() {
331338
// do nothing
332339
}
333340

341+
@Override
334342
public void onError(Exception e) {
335343
handleError(e);
336344
// no callback defined
337345
}
338346

347+
@Override
339348
public void onNext(T args) {
340349
if (onNext == null) {
341350
throw new RuntimeException("onNext must be implemented");
@@ -365,17 +374,20 @@ public Subscription subscribe(final Object onNext, final Object onError) {
365374
*/
366375
return protectivelyWrapAndSubscribe(new Observer() {
367376

377+
@Override
368378
public void onCompleted() {
369379
// do nothing
370380
}
371381

382+
@Override
372383
public void onError(Exception e) {
373384
handleError(e);
374385
if (onError != null) {
375386
Functions.from(onError).call(e);
376387
}
377388
}
378389

390+
@Override
379391
public void onNext(Object args) {
380392
onNextFunction.call(args);
381393
}
@@ -396,17 +408,20 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
396408
*/
397409
return protectivelyWrapAndSubscribe(new Observer<T>() {
398410

411+
@Override
399412
public void onCompleted() {
400413
// do nothing
401414
}
402415

416+
@Override
403417
public void onError(Exception e) {
404418
handleError(e);
405419
if (onError != null) {
406420
onError.call(e);
407421
}
408422
}
409423

424+
@Override
410425
public void onNext(T args) {
411426
if (onNext == null) {
412427
throw new RuntimeException("onNext must be implemented");
@@ -436,19 +451,22 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
436451
*/
437452
return protectivelyWrapAndSubscribe(new Observer() {
438453

454+
@Override
439455
public void onCompleted() {
440456
if (onComplete != null) {
441457
Functions.from(onComplete).call();
442458
}
443459
}
444460

461+
@Override
445462
public void onError(Exception e) {
446463
handleError(e);
447464
if (onError != null) {
448465
Functions.from(onError).call(e);
449466
}
450467
}
451468

469+
@Override
452470
public void onNext(Object args) {
453471
onNextFunction.call(args);
454472
}
@@ -469,17 +487,20 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
469487
*/
470488
return protectivelyWrapAndSubscribe(new Observer<T>() {
471489

490+
@Override
472491
public void onCompleted() {
473492
onComplete.call();
474493
}
475494

495+
@Override
476496
public void onError(Exception e) {
477497
handleError(e);
478498
if (onError != null) {
479499
onError.call(e);
480500
}
481501
}
482502

503+
@Override
483504
public void onNext(T args) {
484505
if (onNext == null) {
485506
throw new RuntimeException("onNext must be implemented");
@@ -516,10 +537,12 @@ public void forEach(final Action1<T> onNext) {
516537
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
517538
*/
518539
protectivelyWrapAndSubscribe(new Observer<T>() {
540+
@Override
519541
public void onCompleted() {
520542
latch.countDown();
521543
}
522544

545+
@Override
523546
public void onError(Exception e) {
524547
/*
525548
* If we receive an onError event we set the reference on the outer thread
@@ -531,6 +554,7 @@ public void onError(Exception e) {
531554
latch.countDown();
532555
}
533556

557+
@Override
534558
public void onNext(T args) {
535559
onNext.call(args);
536560
}
@@ -582,6 +606,7 @@ public void forEach(final Object o) {
582606

583607
forEach(new Action1() {
584608

609+
@Override
585610
public void call(Object args) {
586611
onNext.call(args);
587612
}
@@ -1846,6 +1871,8 @@ public T call(T t1, T t2) {
18461871
*
18471872
* @param <T>
18481873
* the type item emitted by the source Observable
1874+
* @param <R>
1875+
* the type returned for each item of the target observable
18491876
* @param sequence
18501877
* the source Observable
18511878
* @param initialValue
@@ -1857,7 +1884,7 @@ public T call(T t1, T t2) {
18571884
* output from the sequence emitted by the source Observable
18581885
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
18591886
*/
1860-
public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
1887+
public static <T, R> Observable<R> scan(Observable<T> sequence, R initialValue, Func2<R, T, R> accumulator) {
18611888
return create(OperationScan.scan(sequence, initialValue, accumulator));
18621889
}
18631890

@@ -1871,6 +1898,8 @@ public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Fun
18711898
*
18721899
* @param <T>
18731900
* the type item emitted by the source Observable
1901+
* @param <R>
1902+
* the type returned for each item of the target observable
18741903
* @param sequence
18751904
* the source Observable
18761905
* @param initialValue
@@ -1882,17 +1911,16 @@ public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Fun
18821911
* output from the sequence emitted by the source Observable
18831912
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
18841913
*/
1885-
public static <T> Observable<T> scan(final Observable<T> sequence, final T initialValue, final Object accumulator) {
1914+
public static <T, R> Observable<R> scan(final Observable<T> sequence, final R initialValue, final Object accumulator) {
18861915
@SuppressWarnings("rawtypes")
18871916
final FuncN _f = Functions.from(accumulator);
1888-
return scan(sequence, initialValue, new Func2<T, T, T>() {
1917+
return scan(sequence, initialValue, new Func2<R, T, R>() {
18891918

18901919
@SuppressWarnings("unchecked")
18911920
@Override
1892-
public T call(T t1, T t2) {
1893-
return (T) _f.call(t1, t2);
1921+
public R call(R r, T t) {
1922+
return (R) _f.call(r, t);
18941923
}
1895-
18961924
});
18971925
}
18981926

@@ -2743,6 +2771,7 @@ public Observable<T> filter(final Object callback) {
27432771
final FuncN _f = Functions.from(callback);
27442772
return filter(this, new Func1<T, Boolean>() {
27452773

2774+
@Override
27462775
public Boolean call(T t1) {
27472776
return (Boolean) _f.call(t1);
27482777
}
@@ -2913,6 +2942,7 @@ public <R> Observable<R> map(final Object callback) {
29132942
final FuncN _f = Functions.from(callback);
29142943
return map(this, new Func1<T, R>() {
29152944

2945+
@Override
29162946
@SuppressWarnings("unchecked")
29172947
public R call(T t1) {
29182948
return (R) _f.call(t1);
@@ -2963,6 +2993,7 @@ public <R> Observable<R> mapMany(final Object callback) {
29632993
final FuncN _f = Functions.from(callback);
29642994
return mapMany(this, new Func1<T, Observable<R>>() {
29652995

2996+
@Override
29662997
@SuppressWarnings("unchecked")
29672998
public Observable<R> call(T t1) {
29682999
return (Observable<R>) _f.call(t1);
@@ -3071,6 +3102,7 @@ public Observable<T> onErrorResumeNext(final Object resumeFunction) {
30713102
final FuncN _f = Functions.from(resumeFunction);
30723103
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {
30733104

3105+
@Override
30743106
@SuppressWarnings("unchecked")
30753107
public Observable<T> call(Exception e) {
30763108
return (Observable<T>) _f.call(e);
@@ -3152,6 +3184,7 @@ public Observable<T> onErrorReturn(final Object resumeFunction) {
31523184
final FuncN _f = Functions.from(resumeFunction);
31533185
return onErrorReturn(this, new Func1<Exception, T>() {
31543186

3187+
@Override
31553188
@SuppressWarnings("unchecked")
31563189
public T call(Exception e) {
31573190
return (T) _f.call(e);
@@ -3330,7 +3363,7 @@ public Observable<T> scan(final Object accumulator) {
33303363
* the list of Observables.
33313364
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
33323365
*/
3333-
public Observable<T> scan(T initialValue, Func2<T, T, T> accumulator) {
3366+
public <R> Observable<R> scan(R initialValue, Func2<R, T, R> accumulator) {
33343367
return scan(this, initialValue, accumulator);
33353368
}
33363369

@@ -3353,7 +3386,7 @@ public Observable<T> scan(T initialValue, Func2<T, T, T> accumulator) {
33533386
* the list of Observables.
33543387
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
33553388
*/
3356-
public Observable<T> scan(final T initialValue, final Object accumulator) {
3389+
public <R> Observable<R> scan(final R initialValue, final Object accumulator) {
33573390
return scan(this, initialValue, accumulator);
33583391
}
33593392

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,9 @@ public Subscription call(final Observer<T> observer) {
7979
@Override
8080
public void call(T value) {
8181
initialValue = value;
82-
observer.onNext(value);
8382
}
8483
});
85-
Accumulator<T, T> scan = new Accumulator<T, T>(sequence /* FIXME .drop(1) */, initialValue, accumlatorFunction);
84+
Accumulator<T, T> scan = new Accumulator<T, T>(sequence.skip(1), initialValue, accumlatorFunction);
8685
return scan.call(observer);
8786
}
8887
}
@@ -101,6 +100,8 @@ private Accumulator(Observable<T> sequence, R initialValue, Func2<R, T, R> accum
101100

102101
@Override
103102
public Subscription call(final Observer<R> observer) {
103+
observer.onNext(initialValue);
104+
104105
return subscription.wrap(sequence.subscribe(new Observer<T>() {
105106
private R acc = initialValue;
106107

@@ -114,7 +115,6 @@ public Subscription call(final Observer<R> observer) {
114115
@Override
115116
public synchronized void onNext(T value) {
116117
try {
117-
118118
acc = accumlatorFunction.call(acc, value);
119119
observer.onNext(acc);
120120
} catch (Exception ex) {
@@ -147,28 +147,28 @@ public void before() {
147147
@Test
148148
public void testScanIntegersWithInitialValue() {
149149
@SuppressWarnings("unchecked")
150-
Observer<Integer> Observer = mock(Observer.class);
150+
Observer<String> observer = mock(Observer.class);
151151

152152
Observable<Integer> observable = Observable.toObservable(1, 2, 3);
153153

154-
Observable<Integer> m = Observable.create(scan(observable, 0, new Func2<Integer, Integer, Integer>() {
154+
Observable<String> m = Observable.create(scan(observable, "", new Func2<String, Integer, String>() {
155155

156156
@Override
157-
public Integer call(Integer t1, Integer t2) {
158-
return t1 + t2;
157+
public String call(String s, Integer n) {
158+
return s + n.toString();
159159
}
160160

161161
}));
162-
m.subscribe(Observer);
163-
164-
verify(Observer, never()).onError(any(Exception.class));
165-
verify(Observer, times(1)).onNext(0);
166-
verify(Observer, times(1)).onNext(1);
167-
verify(Observer, times(1)).onNext(3);
168-
verify(Observer, times(1)).onNext(6);
169-
verify(Observer, times(4)).onNext(anyInt());
170-
verify(Observer, times(1)).onCompleted();
171-
verify(Observer, never()).onError(any(Exception.class));
162+
m.subscribe(observer);
163+
164+
verify(observer, never()).onError(any(Exception.class));
165+
verify(observer, times(1)).onNext("");
166+
verify(observer, times(1)).onNext("1");
167+
verify(observer, times(1)).onNext("12");
168+
verify(observer, times(1)).onNext("123");
169+
verify(observer, times(4)).onNext(anyString());
170+
verify(observer, times(1)).onCompleted();
171+
verify(observer, never()).onError(any(Exception.class));
172172
}
173173

174174
@Test

0 commit comments

Comments
 (0)