Skip to content

Commit 17beff8

Browse files
author
jmhofer
committed
Added timestamp method to Observable, too.
1 parent 2eff168 commit 17beff8

File tree

1 file changed

+40
-0
lines changed

1 file changed

+40
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import rx.operators.OperationTakeLast;
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationTimestamp;
6768
import rx.operators.OperationToIterator;
6869
import rx.operators.OperationToObservableFuture;
6970
import rx.operators.OperationToObservableIterable;
@@ -80,6 +81,7 @@
8081
import rx.util.AtomicObservableSubscription;
8182
import rx.util.AtomicObserver;
8283
import rx.util.Range;
84+
import rx.util.Timestamped;
8385
import rx.util.functions.Action0;
8486
import rx.util.functions.Action1;
8587
import rx.util.functions.Func0;
@@ -251,13 +253,15 @@ public Subscription subscribe(final Map<String, Object> callbacks) {
251253
*/
252254
return protectivelyWrapAndSubscribe(new Observer() {
253255

256+
@Override
254257
public void onCompleted() {
255258
Object onComplete = callbacks.get("onCompleted");
256259
if (onComplete != null) {
257260
Functions.from(onComplete).call();
258261
}
259262
}
260263

264+
@Override
261265
public void onError(Exception e) {
262266
handleError(e);
263267
Object onError = callbacks.get("onError");
@@ -266,6 +270,7 @@ public void onError(Exception e) {
266270
}
267271
}
268272

273+
@Override
269274
public void onNext(Object args) {
270275
onNext.call(args);
271276
}
@@ -297,15 +302,18 @@ public Subscription subscribe(final Object o) {
297302
*/
298303
return protectivelyWrapAndSubscribe(new Observer() {
299304

305+
@Override
300306
public void onCompleted() {
301307
// do nothing
302308
}
303309

310+
@Override
304311
public void onError(Exception e) {
305312
handleError(e);
306313
// no callback defined
307314
}
308315

316+
@Override
309317
public void onNext(Object args) {
310318
onNext.call(args);
311319
}
@@ -326,15 +334,18 @@ public Subscription subscribe(final Action1<T> onNext) {
326334
*/
327335
return protectivelyWrapAndSubscribe(new Observer<T>() {
328336

337+
@Override
329338
public void onCompleted() {
330339
// do nothing
331340
}
332341

342+
@Override
333343
public void onError(Exception e) {
334344
handleError(e);
335345
// no callback defined
336346
}
337347

348+
@Override
338349
public void onNext(T args) {
339350
if (onNext == null) {
340351
throw new RuntimeException("onNext must be implemented");
@@ -364,17 +375,20 @@ public Subscription subscribe(final Object onNext, final Object onError) {
364375
*/
365376
return protectivelyWrapAndSubscribe(new Observer() {
366377

378+
@Override
367379
public void onCompleted() {
368380
// do nothing
369381
}
370382

383+
@Override
371384
public void onError(Exception e) {
372385
handleError(e);
373386
if (onError != null) {
374387
Functions.from(onError).call(e);
375388
}
376389
}
377390

391+
@Override
378392
public void onNext(Object args) {
379393
onNextFunction.call(args);
380394
}
@@ -395,17 +409,20 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
395409
*/
396410
return protectivelyWrapAndSubscribe(new Observer<T>() {
397411

412+
@Override
398413
public void onCompleted() {
399414
// do nothing
400415
}
401416

417+
@Override
402418
public void onError(Exception e) {
403419
handleError(e);
404420
if (onError != null) {
405421
onError.call(e);
406422
}
407423
}
408424

425+
@Override
409426
public void onNext(T args) {
410427
if (onNext == null) {
411428
throw new RuntimeException("onNext must be implemented");
@@ -435,19 +452,22 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
435452
*/
436453
return protectivelyWrapAndSubscribe(new Observer() {
437454

455+
@Override
438456
public void onCompleted() {
439457
if (onComplete != null) {
440458
Functions.from(onComplete).call();
441459
}
442460
}
443461

462+
@Override
444463
public void onError(Exception e) {
445464
handleError(e);
446465
if (onError != null) {
447466
Functions.from(onError).call(e);
448467
}
449468
}
450469

470+
@Override
451471
public void onNext(Object args) {
452472
onNextFunction.call(args);
453473
}
@@ -468,17 +488,20 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
468488
*/
469489
return protectivelyWrapAndSubscribe(new Observer<T>() {
470490

491+
@Override
471492
public void onCompleted() {
472493
onComplete.call();
473494
}
474495

496+
@Override
475497
public void onError(Exception e) {
476498
handleError(e);
477499
if (onError != null) {
478500
onError.call(e);
479501
}
480502
}
481503

504+
@Override
482505
public void onNext(T args) {
483506
if (onNext == null) {
484507
throw new RuntimeException("onNext must be implemented");
@@ -515,10 +538,12 @@ public void forEach(final Action1<T> onNext) {
515538
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
516539
*/
517540
protectivelyWrapAndSubscribe(new Observer<T>() {
541+
@Override
518542
public void onCompleted() {
519543
latch.countDown();
520544
}
521545

546+
@Override
522547
public void onError(Exception e) {
523548
/*
524549
* If we receive an onError event we set the reference on the outer thread
@@ -530,6 +555,7 @@ public void onError(Exception e) {
530555
latch.countDown();
531556
}
532557

558+
@Override
533559
public void onNext(T args) {
534560
onNext.call(args);
535561
}
@@ -581,6 +607,7 @@ public void forEach(final Object o) {
581607

582608
forEach(new Action1() {
583609

610+
@Override
584611
public void call(Object args) {
585612
onNext.call(args);
586613
}
@@ -2664,6 +2691,7 @@ public Observable<T> filter(final Object callback) {
26642691
final FuncN _f = Functions.from(callback);
26652692
return filter(this, new Func1<T, Boolean>() {
26662693

2694+
@Override
26672695
public Boolean call(T t1) {
26682696
return (Boolean) _f.call(t1);
26692697
}
@@ -2792,6 +2820,7 @@ public <R> Observable<R> map(final Object callback) {
27922820
final FuncN _f = Functions.from(callback);
27932821
return map(this, new Func1<T, R>() {
27942822

2823+
@Override
27952824
@SuppressWarnings("unchecked")
27962825
public R call(T t1) {
27972826
return (R) _f.call(t1);
@@ -2836,6 +2865,7 @@ public <R> Observable<R> mapMany(final Object callback) {
28362865
final FuncN _f = Functions.from(callback);
28372866
return mapMany(this, new Func1<T, Observable<R>>() {
28382867

2868+
@Override
28392869
@SuppressWarnings("unchecked")
28402870
public Observable<R> call(T t1) {
28412871
return (Observable<R>) _f.call(t1);
@@ -2944,6 +2974,7 @@ public Observable<T> onErrorResumeNext(final Object resumeFunction) {
29442974
final FuncN _f = Functions.from(resumeFunction);
29452975
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {
29462976

2977+
@Override
29472978
@SuppressWarnings("unchecked")
29482979
public Observable<T> call(Exception e) {
29492980
return (Observable<T>) _f.call(e);
@@ -3025,6 +3056,7 @@ public Observable<T> onErrorReturn(final Object resumeFunction) {
30253056
final FuncN _f = Functions.from(resumeFunction);
30263057
return onErrorReturn(this, new Func1<Exception, T>() {
30273058

3059+
@Override
30283060
@SuppressWarnings("unchecked")
30293061
public T call(Exception e) {
30303062
return (T) _f.call(e);
@@ -3360,6 +3392,14 @@ public <E> Observable<T> takeUntil(Observable<E> other) {
33603392
return takeUntil(this, other);
33613393
}
33623394

3395+
/**
3396+
* Adds a timestamp to each item emitted by this observable.
3397+
* @return An observable sequence of timestamped items.
3398+
*/
3399+
public Observable<Timestamped<T>> timestamp() {
3400+
return create(OperationTimestamp.timestamp(this));
3401+
}
3402+
33633403
/**
33643404
* Returns an Observable that emits a single item, a list composed of all the items emitted by
33653405
* the source Observable.

0 commit comments

Comments
 (0)