Skip to content

Commit b7cb10b

Browse files
author
jmhofer
committed
taken over default recursive implementation
1 parent 504bd05 commit b7cb10b

File tree

1 file changed

+35
-1
lines changed

1 file changed

+35
-1
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Date;
1919
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import rx.subscriptions.Subscriptions;
2223
import rx.util.functions.Action0;
@@ -73,6 +74,8 @@ public abstract class Scheduler {
7374

7475
/**
7576
* Schedules a cancelable action to be executed periodically.
77+
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
78+
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
7679
*
7780
* @param state State to pass into the action.
7881
* @param action The action to execute periodically.
@@ -81,7 +84,38 @@ public abstract class Scheduler {
8184
* @param unit The time unit the interval above is given in.
8285
* @return A subscription to be able to unsubscribe from action.
8386
*/
84-
public abstract <T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit);
87+
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
88+
final long periodInNanos = unit.toNanos(period);
89+
final AtomicBoolean complete = new AtomicBoolean();
90+
91+
final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
92+
@Override
93+
public Subscription call(Scheduler scheduler, T state0) {
94+
if (! complete.get()) {
95+
long startedAt = System.nanoTime();
96+
final Subscription sub1 = action.call(scheduler, state0);
97+
long timeTakenByActionInNanos = System.nanoTime() - startedAt;
98+
final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
99+
return Subscriptions.create(new Action0() {
100+
@Override
101+
public void call() {
102+
sub1.unsubscribe();
103+
sub2.unsubscribe();
104+
}
105+
});
106+
}
107+
return Subscriptions.empty();
108+
}
109+
};
110+
final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
111+
return Subscriptions.create(new Action0() {
112+
@Override
113+
public void call() {
114+
complete.set(true);
115+
sub.unsubscribe();
116+
}
117+
});
118+
}
85119

86120
/**
87121
* Schedules a cancelable action to be executed at dueTime.

0 commit comments

Comments
 (0)