Skip to content

Commit d3f83fa

Browse files
committed
Allow use of the returned subscription to cancel periodic scheduling
The documentation for schedulePeriodically indicates that the returned subscription can be used to unsubscribe from the periodic action, or to unschedule it if it has not been scheduled yet. That was the case only before the first action took place, and it was then impossible to unsubscribe using the given subscription, although unsubscribing the worker did work.
1 parent 057db88 commit d3f83fa

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import rx.functions.Action0;
2121
import rx.schedulers.Schedulers;
22+
import rx.subscriptions.MultipleAssignmentSubscription;
2223

2324
/**
2425
* Represents an object that schedules units of work.
@@ -108,18 +109,20 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay
108109
final long periodInNanos = unit.toNanos(period);
109110
final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
110111

112+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
111113
final Action0 recursiveAction = new Action0() {
112114
long count = 0;
113115
@Override
114116
public void call() {
115-
if (!isUnsubscribed()) {
117+
if (!mas.isUnsubscribed()) {
116118
action.call();
117119
long nextTick = startInNanos + (++count * periodInNanos);
118-
schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS);
120+
mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS));
119121
}
120122
}
121123
};
122-
return schedule(recursiveAction, initialDelay, unit);
124+
mas.set(schedule(recursiveAction, initialDelay, unit));
125+
return mas;
123126
}
124127

125128
/**

rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.mockito.Mockito;
3131

3232
import rx.Scheduler;
33+
import rx.Subscription;
3334
import rx.functions.Action0;
3435
import rx.functions.Func1;
3536

@@ -77,6 +78,48 @@ public void call() {
7778
inOrder.verify(calledOp, never()).call(anyLong());
7879
}
7980

81+
@SuppressWarnings("unchecked")
82+
// mocking is unchecked, unfortunately
83+
@Test
84+
public final void testPeriodicSchedulingUnsubscription() {
85+
final Func1<Long, Void> calledOp = mock(Func1.class);
86+
87+
final TestScheduler scheduler = new TestScheduler();
88+
final Scheduler.Worker inner = scheduler.createWorker();
89+
90+
final Subscription subscription = inner.schedulePeriodically(new Action0() {
91+
@Override
92+
public void call() {
93+
System.out.println(scheduler.now());
94+
calledOp.call(scheduler.now());
95+
}
96+
}, 1, 2, TimeUnit.SECONDS);
97+
98+
verify(calledOp, never()).call(anyLong());
99+
100+
InOrder inOrder = Mockito.inOrder(calledOp);
101+
102+
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
103+
inOrder.verify(calledOp, never()).call(anyLong());
104+
105+
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
106+
inOrder.verify(calledOp, times(1)).call(1000L);
107+
108+
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
109+
inOrder.verify(calledOp, never()).call(3000L);
110+
111+
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
112+
inOrder.verify(calledOp, times(1)).call(3000L);
113+
114+
scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
115+
inOrder.verify(calledOp, times(1)).call(5000L);
116+
inOrder.verify(calledOp, times(1)).call(7000L);
117+
118+
subscription.unsubscribe();
119+
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
120+
inOrder.verify(calledOp, never()).call(anyLong());
121+
}
122+
80123
@Test
81124
public final void testImmediateUnsubscribes() {
82125
TestScheduler s = new TestScheduler();
@@ -97,4 +140,24 @@ public void call() {
97140
assertEquals(0, counter.get());
98141
}
99142

143+
@Test
144+
public final void testImmediateUnsubscribes2() {
145+
TestScheduler s = new TestScheduler();
146+
final Scheduler.Worker inner = s.createWorker();
147+
final AtomicInteger counter = new AtomicInteger(0);
148+
149+
final Subscription subscription = inner.schedule(new Action0() {
150+
151+
@Override
152+
public void call() {
153+
counter.incrementAndGet();
154+
System.out.println("counter: " + counter.get());
155+
inner.schedule(this);
156+
}
157+
158+
});
159+
subscription.unsubscribe();
160+
assertEquals(0, counter.get());
161+
}
162+
100163
}

0 commit comments

Comments
 (0)