Skip to content

Commit 034cf47

Browse files
Merge pull request #609 from akarnokd/OperationTimer3
Operation Timer 3.0
2 parents ccf921b + 2bb345b commit 034cf47

File tree

4 files changed

+176
-60
lines changed

4 files changed

+176
-60
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2001,12 +2001,12 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
20012001
* <p>
20022002
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.png">
20032003
*
2004-
* @param interval interval size in time units
2004+
* @param delay the initial delay before emitting a single 0L
20052005
* @param unit time units to use for the interval size
20062006
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
20072007
*/
2008-
public static Observable<Void> timer(long interval, TimeUnit unit) {
2009-
return create(OperationTimer.timer(interval, unit));
2008+
public static Observable<Long> timer(long delay, TimeUnit unit) {
2009+
return timer(delay, unit, Schedulers.threadPoolForComputation());
20102010
}
20112011

20122012
/**
@@ -2015,13 +2015,44 @@ public static Observable<Void> timer(long interval, TimeUnit unit) {
20152015
* <p>
20162016
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.s.png">
20172017
*
2018-
* @param interval interval size in time units
2018+
* @param delay the initial delay before emitting a single 0L
20192019
* @param unit time units to use for the interval size
20202020
* @param scheduler the scheduler to use for scheduling the item
20212021
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
20222022
*/
2023-
public static Observable<Void> timer(long interval, TimeUnit unit, Scheduler scheduler) {
2024-
return create(OperationTimer.timer(interval, unit, scheduler));
2023+
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
2024+
return create(new OperationTimer.TimerOnce(delay, unit, scheduler));
2025+
}
2026+
2027+
/**
2028+
* Return an Observable which emits a 0L after the initialDelay and ever increasing
2029+
* numbers after each period.
2030+
*
2031+
* @param initialDelay the initial delay time to wait before emitting the first value of 0L
2032+
* @param period the time period after emitting the subsequent numbers
2033+
* @param unit the time unit for both <code>initialDelay</code> and <code>period</code>
2034+
* @return an Observable which emits a 0L after the initialDelay and ever increasing
2035+
* numbers after each period
2036+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229435.aspx'>MSDN: Observable.Timer</a>
2037+
*/
2038+
public static Observable<Long> timer(long initialDelay, long period, TimeUnit unit) {
2039+
return timer(initialDelay, period, unit, Schedulers.threadPoolForComputation());
2040+
}
2041+
2042+
/**
2043+
* Return an Observable which emits a 0L after the initialDelay and ever increasing
2044+
* numbers after each period while running on the given scheduler.
2045+
*
2046+
* @param initialDelay the initial delay time to wait before emitting the first value of 0L
2047+
* @param period the time period after emitting the subsequent numbers
2048+
* @param unit the time unit for both <code>initialDelay</code> and <code>period</code>
2049+
* @param scheduler the scheduler where the waiting happens and value emissions run.
2050+
* @return an Observable which emits a 0L after the initialDelay and ever increasing
2051+
* numbers after each period while running on the given scheduler
2052+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229652.aspx'>MSDN: Observable.Timer</a>
2053+
*/
2054+
public static Observable<Long> timer(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
2055+
return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler));
20252056
}
20262057

20272058
/**

rxjava-core/src/main/java/rx/operators/OperationDelay.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ public static <T> Observable<T> delay(Observable<T> observable, final long delay
2828
// observable.map(x => Observable.timer(t).map(_ => x).startItAlreadyNow()).concat()
2929
Observable<Observable<T>> seqs = observable.map(new Func1<T, Observable<T>>() {
3030
public Observable<T> call(final T x) {
31-
ConnectableObservable<T> co = Observable.timer(delay, unit, scheduler).map(new Func1<Void, T>() {
32-
public T call(Void ignored) {
31+
ConnectableObservable<T> co = Observable.timer(delay, unit, scheduler).map(new Func1<Long, T>() {
32+
@Override
33+
public T call(Long ignored) {
3334
return x;
3435
}
3536
}).replay();
Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,86 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
1616
package rx.operators;
1717

1818
import java.util.concurrent.TimeUnit;
19-
2019
import rx.Observable.OnSubscribeFunc;
2120
import rx.Observer;
2221
import rx.Scheduler;
2322
import rx.Subscription;
24-
import rx.schedulers.Schedulers;
25-
import rx.subscriptions.Subscriptions;
2623
import rx.util.functions.Action0;
2724

25+
/**
26+
* Operation Timer with several overloads.
27+
*
28+
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.timer.aspx'>MSDN Observable.Timer</a>
29+
*/
2830
public final class OperationTimer {
29-
30-
public static OnSubscribeFunc<Void> timer(long interval, TimeUnit unit) {
31-
return timer(interval, unit, Schedulers.threadPoolForComputation());
32-
}
33-
34-
public static OnSubscribeFunc<Void> timer(final long delay, final TimeUnit unit, final Scheduler scheduler) {
35-
return new OnSubscribeFunc<Void>() {
36-
@Override
37-
public Subscription onSubscribe(Observer<? super Void> observer) {
38-
return new Timer(delay, unit, scheduler, observer).start();
39-
}
40-
};
41-
}
42-
43-
private static class Timer {
44-
private final long period;
45-
private final TimeUnit unit;
31+
private OperationTimer() { throw new IllegalStateException("No instances!"); }
32+
33+
/**
34+
* Emit a single 0L after the specified time elapses.
35+
*/
36+
public static class TimerOnce implements OnSubscribeFunc<Long> {
4637
private final Scheduler scheduler;
47-
private final Observer<? super Void> observer;
48-
49-
private Timer(long period, TimeUnit unit, Scheduler scheduler, Observer<? super Void> observer) {
50-
this.period = period;
51-
this.unit = unit;
38+
private final long dueTime;
39+
private final TimeUnit dueUnit;
40+
public TimerOnce(long dueTime, TimeUnit unit, Scheduler scheduler) {
5241
this.scheduler = scheduler;
53-
this.observer = observer;
42+
this.dueTime = dueTime;
43+
this.dueUnit = unit;
5444
}
55-
56-
public Subscription start() {
57-
final Subscription s = scheduler.schedule(new Action0() {
45+
46+
@Override
47+
public Subscription onSubscribe(final Observer<? super Long> t1) {
48+
return scheduler.schedule(new Action0() {
5849
@Override
5950
public void call() {
60-
observer.onNext(null);
61-
observer.onCompleted();
51+
t1.onNext(0L);
52+
t1.onCompleted();
6253
}
63-
}, period, unit);
64-
65-
return Subscriptions.create(new Action0() {
54+
55+
}, dueTime, dueUnit);
56+
}
57+
}
58+
/**
59+
* Emit 0L after the initial period and ever increasing number after each period.
60+
*/
61+
public static class TimerPeriodically implements OnSubscribeFunc<Long> {
62+
private final Scheduler scheduler;
63+
private final long initialDelay;
64+
private final long period;
65+
private final TimeUnit unit;
66+
public TimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
67+
this.scheduler = scheduler;
68+
this.initialDelay = initialDelay;
69+
this.period = period;
70+
this.unit = unit;
71+
}
72+
73+
@Override
74+
public Subscription onSubscribe(final Observer<? super Long> t1) {
75+
return scheduler.schedulePeriodically(new Action0() {
76+
long count;
6677
@Override
6778
public void call() {
68-
s.unsubscribe();
79+
t1.onNext(count++);
6980
}
70-
});
81+
},
82+
initialDelay, period, unit
83+
);
7184
}
7285
}
73-
7486
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import org.junit.Before;
20+
import org.junit.Test;
21+
import org.mockito.InOrder;
22+
import org.mockito.Mock;
23+
import static org.mockito.Mockito.*;
24+
import org.mockito.MockitoAnnotations;
25+
import rx.Observable;
26+
import rx.Observer;
27+
import rx.Subscription;
28+
import rx.schedulers.TestScheduler;
29+
30+
public class OperationTimerTest {
31+
@Mock
32+
Observer<Object> observer;
33+
TestScheduler s;
34+
@Before
35+
public void before() {
36+
MockitoAnnotations.initMocks(this);
37+
s = new TestScheduler();
38+
}
39+
@Test
40+
public void testTimerOnce() {
41+
Observable.timer(100, TimeUnit.MILLISECONDS, s).subscribe(observer);
42+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
43+
44+
verify(observer, times(1)).onNext(0L);
45+
verify(observer, times(1)).onCompleted();
46+
verify(observer, never()).onError(any(Throwable.class));
47+
}
48+
@Test
49+
public void testTimerPeriodically() {
50+
Subscription c = Observable.timer(100, 100, TimeUnit.MILLISECONDS, s).subscribe(observer);
51+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
52+
53+
InOrder inOrder = inOrder(observer);
54+
inOrder.verify(observer, times(1)).onNext(0L);
55+
56+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
57+
inOrder.verify(observer, times(1)).onNext(1L);
58+
59+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
60+
inOrder.verify(observer, times(1)).onNext(2L);
61+
62+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
63+
inOrder.verify(observer, times(1)).onNext(3L);
64+
65+
c.unsubscribe();
66+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
67+
inOrder.verify(observer, never()).onNext(any());
68+
69+
verify(observer, never()).onCompleted();
70+
verify(observer, never()).onError(any(Throwable.class));
71+
}
72+
}

0 commit comments

Comments
 (0)