Skip to content

Commit 473a567

Browse files
Merge pull request #1347 from samueltardieu/unsubscribe-from-periodic
Allow use of the returned subscription to cancel periodic scheduling
2 parents 4b53474 + d3f83fa commit 473a567

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import rx.functions.Action0;
2121
import rx.schedulers.Schedulers;
22+
import rx.subscriptions.MultipleAssignmentSubscription;
2223

2324
/**
2425
* Represents an object that schedules units of work.
@@ -108,18 +109,20 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay
108109
final long periodInNanos = unit.toNanos(period);
109110
final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
110111

112+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
111113
final Action0 recursiveAction = new Action0() {
112114
long count = 0;
113115
@Override
114116
public void call() {
115-
if (!isUnsubscribed()) {
117+
if (!mas.isUnsubscribed()) {
116118
action.call();
117119
long nextTick = startInNanos + (++count * periodInNanos);
118-
schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS);
120+
mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS));
119121
}
120122
}
121123
};
122-
return schedule(recursiveAction, initialDelay, unit);
124+
mas.set(schedule(recursiveAction, initialDelay, unit));
125+
return mas;
123126
}
124127

125128
/**

rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.mockito.Mockito;
3131

3232
import rx.Scheduler;
33+
import rx.Subscription;
3334
import rx.functions.Action0;
3435
import rx.functions.Func1;
3536

@@ -77,6 +78,48 @@ public void call() {
7778
inOrder.verify(calledOp, never()).call(anyLong());
7879
}
7980

81+
@SuppressWarnings("unchecked")
82+
// mocking is unchecked, unfortunately
83+
@Test
84+
public final void testPeriodicSchedulingUnsubscription() {
85+
final Func1<Long, Void> calledOp = mock(Func1.class);
86+
87+
final TestScheduler scheduler = new TestScheduler();
88+
final Scheduler.Worker inner = scheduler.createWorker();
89+
90+
final Subscription subscription = inner.schedulePeriodically(new Action0() {
91+
@Override
92+
public void call() {
93+
System.out.println(scheduler.now());
94+
calledOp.call(scheduler.now());
95+
}
96+
}, 1, 2, TimeUnit.SECONDS);
97+
98+
verify(calledOp, never()).call(anyLong());
99+
100+
InOrder inOrder = Mockito.inOrder(calledOp);
101+
102+
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
103+
inOrder.verify(calledOp, never()).call(anyLong());
104+
105+
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
106+
inOrder.verify(calledOp, times(1)).call(1000L);
107+
108+
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
109+
inOrder.verify(calledOp, never()).call(3000L);
110+
111+
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
112+
inOrder.verify(calledOp, times(1)).call(3000L);
113+
114+
scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
115+
inOrder.verify(calledOp, times(1)).call(5000L);
116+
inOrder.verify(calledOp, times(1)).call(7000L);
117+
118+
subscription.unsubscribe();
119+
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
120+
inOrder.verify(calledOp, never()).call(anyLong());
121+
}
122+
80123
@Test
81124
public final void testImmediateUnsubscribes() {
82125
TestScheduler s = new TestScheduler();
@@ -97,4 +140,24 @@ public void call() {
97140
assertEquals(0, counter.get());
98141
}
99142

143+
@Test
144+
public final void testImmediateUnsubscribes2() {
145+
TestScheduler s = new TestScheduler();
146+
final Scheduler.Worker inner = s.createWorker();
147+
final AtomicInteger counter = new AtomicInteger(0);
148+
149+
final Subscription subscription = inner.schedule(new Action0() {
150+
151+
@Override
152+
public void call() {
153+
counter.incrementAndGet();
154+
System.out.println("counter: " + counter.get());
155+
inner.schedule(this);
156+
}
157+
158+
});
159+
subscription.unsubscribe();
160+
assertEquals(0, counter.get());
161+
}
162+
100163
}

0 commit comments

Comments
 (0)