Skip to content

Commit 788d431

Browse files
author
jmhofer
committed
Implemented periodic scheduling, too. Needs testing now.
1 parent a1f1540 commit 788d431

File tree

1 file changed

+48
-5
lines changed

1 file changed

+48
-5
lines changed

rxjava-core/src/main/java/rx/concurrency/SwingScheduler.java

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import rx.Scheduler;
3333
import rx.Subscription;
34+
import rx.subscriptions.CompositeSubscription;
3435
import rx.subscriptions.Subscriptions;
3536
import rx.util.functions.Action0;
3637
import rx.util.functions.Func2;
@@ -73,11 +74,7 @@ public void call() {
7374
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
7475
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
7576
long delay = unit.toMillis(dueTime);
76-
77-
if (delay > Integer.MAX_VALUE) {
78-
throw new IllegalArgumentException(String.format(
79-
"The swing timer only accepts delays up to %d milliseconds.", Integer.MAX_VALUE));
80-
}
77+
assertThatTheDelayIsValidForTheSwingTimer(delay);
8178

8279
class ExecuteOnceAction implements ActionListener {
8380
private Timer timer;
@@ -110,6 +107,52 @@ public void call() {
110107
}
111108
});
112109
}
110+
111+
@Override
112+
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
113+
// FIXME test this!
114+
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
115+
116+
final long delay = unit.toMillis(period);
117+
assertThatTheDelayIsValidForTheSwingTimer(delay);
118+
119+
final CompositeSubscription subscriptions = new CompositeSubscription();
120+
final Func2<Scheduler, T, Subscription> initialAction = new Func2<Scheduler, T, Subscription>() {
121+
@Override
122+
public Subscription call(final Scheduler scheduler, final T state) {
123+
// call the action once initially
124+
subscriptions.add(action.call(scheduler, state));
125+
126+
// start timer for periodic execution, collect subscriptions
127+
timer.set(new Timer((int) delay, new ActionListener() {
128+
@Override
129+
public void actionPerformed(ActionEvent e) {
130+
subscriptions.add(action.call(scheduler, state));
131+
}
132+
}));
133+
timer.get().start();
134+
135+
return action.call(scheduler, state);
136+
}
137+
};
138+
subscriptions.add(schedule(state, initialAction, initialDelay, unit));
139+
140+
subscriptions.add(Subscriptions.create(new Action0() {
141+
@Override
142+
public void call() {
143+
// in addition to all the individual unsubscriptions, stop the timer on unsubscribing
144+
timer.get().stop();
145+
}
146+
}));
147+
148+
return subscriptions;
149+
}
150+
151+
private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
152+
if (delay > Integer.MAX_VALUE) {
153+
throw new IllegalArgumentException(String.format("The swing timer only accepts delays up to %d milliseconds.", Integer.MAX_VALUE));
154+
}
155+
}
113156

114157
public static class UnitTest {
115158
@Test

0 commit comments

Comments
 (0)