Skip to content

Commit c507577

Browse files
authored
2.x: make sure interval+trampoline can be stopped (#5367)
1 parent 0b0355e commit c507577

File tree

9 files changed

+105
-9
lines changed

9 files changed

+105
-9
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.*;
22+
import io.reactivex.Scheduler.Worker;
2223
import io.reactivex.disposables.Disposable;
2324
import io.reactivex.exceptions.MissingBackpressureException;
2425
import io.reactivex.internal.disposables.DisposableHelper;
26+
import io.reactivex.internal.schedulers.TrampolineScheduler;
2527
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2628
import io.reactivex.internal.util.BackpressureHelper;
2729

@@ -43,9 +45,16 @@ public void subscribeActual(Subscriber<? super Long> s) {
4345
IntervalSubscriber is = new IntervalSubscriber(s);
4446
s.onSubscribe(is);
4547

46-
Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit);
48+
Scheduler sch = scheduler;
4749

48-
is.setResource(d);
50+
if (sch instanceof TrampolineScheduler) {
51+
Worker worker = sch.createWorker();
52+
is.setResource(worker);
53+
worker.schedulePeriodically(is, initialDelay, period, unit);
54+
} else {
55+
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
56+
is.setResource(d);
57+
}
4958
}
5059

5160
static final class IntervalSubscriber extends AtomicLong

src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.*;
22+
import io.reactivex.Scheduler.Worker;
2223
import io.reactivex.disposables.Disposable;
2324
import io.reactivex.exceptions.MissingBackpressureException;
2425
import io.reactivex.internal.disposables.DisposableHelper;
26+
import io.reactivex.internal.schedulers.TrampolineScheduler;
2527
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2628
import io.reactivex.internal.util.BackpressureHelper;
2729

@@ -47,9 +49,16 @@ public void subscribeActual(Subscriber<? super Long> s) {
4749
IntervalRangeSubscriber is = new IntervalRangeSubscriber(s, start, end);
4850
s.onSubscribe(is);
4951

50-
Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit);
52+
Scheduler sch = scheduler;
5153

52-
is.setResource(d);
54+
if (sch instanceof TrampolineScheduler) {
55+
Worker worker = sch.createWorker();
56+
is.setResource(worker);
57+
worker.schedulePeriodically(is, initialDelay, period, unit);
58+
} else {
59+
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
60+
is.setResource(d);
61+
}
5362
}
5463

5564
static final class IntervalRangeSubscriber extends AtomicLong

src/main/java/io/reactivex/internal/operators/observable/ObservableInterval.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818

1919
import io.reactivex.*;
20+
import io.reactivex.Scheduler.Worker;
2021
import io.reactivex.disposables.Disposable;
2122
import io.reactivex.internal.disposables.*;
23+
import io.reactivex.internal.schedulers.TrampolineScheduler;
2224

2325
public final class ObservableInterval extends Observable<Long> {
2426
final Scheduler scheduler;
@@ -38,9 +40,16 @@ public void subscribeActual(Observer<? super Long> s) {
3840
IntervalObserver is = new IntervalObserver(s);
3941
s.onSubscribe(is);
4042

41-
Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit);
43+
Scheduler sch = scheduler;
4244

43-
is.setResource(d);
45+
if (sch instanceof TrampolineScheduler) {
46+
Worker worker = sch.createWorker();
47+
is.setResource(worker);
48+
worker.schedulePeriodically(is, initialDelay, period, unit);
49+
} else {
50+
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
51+
is.setResource(d);
52+
}
4453
}
4554

4655
static final class IntervalObserver

src/main/java/io/reactivex/internal/operators/observable/ObservableIntervalRange.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818

1919
import io.reactivex.*;
20+
import io.reactivex.Scheduler.Worker;
2021
import io.reactivex.disposables.Disposable;
2122
import io.reactivex.internal.disposables.*;
23+
import io.reactivex.internal.schedulers.TrampolineScheduler;
2224

2325
public final class ObservableIntervalRange extends Observable<Long> {
2426
final Scheduler scheduler;
@@ -42,9 +44,16 @@ public void subscribeActual(Observer<? super Long> s) {
4244
IntervalRangeObserver is = new IntervalRangeObserver(s, start, end);
4345
s.onSubscribe(is);
4446

45-
Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit);
47+
Scheduler sch = scheduler;
4648

47-
is.setResource(d);
49+
if (sch instanceof TrampolineScheduler) {
50+
Worker worker = sch.createWorker();
51+
is.setResource(worker);
52+
worker.schedulePeriodically(is, initialDelay, period, unit);
53+
} else {
54+
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
55+
is.setResource(d);
56+
}
4857
}
4958

5059
static final class IntervalRangeObserver

src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ Disposable enqueue(Runnable action, long execTime) {
100100
int missed = 1;
101101
for (;;) {
102102
for (;;) {
103+
if (disposed) {
104+
queue.clear();
105+
return EmptyDisposable.INSTANCE;
106+
}
103107
final TimedRunnable polled = queue.poll();
104108
if (polled == null) {
105109
break;

src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,12 @@ public void take() {
109109
.awaitDone(5, TimeUnit.SECONDS)
110110
.assertResult(1L);
111111
}
112+
113+
@Test(timeout = 2000)
114+
public void cancel() {
115+
Flowable.intervalRange(0, 20, 1, 1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
116+
.take(10)
117+
.test()
118+
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
119+
}
112120
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.TimeUnit;
17+
18+
import org.junit.Test;
19+
20+
import io.reactivex.Flowable;
21+
import io.reactivex.schedulers.Schedulers;
22+
23+
public class FlowableIntervalTest {
24+
25+
@Test(timeout = 2000)
26+
public void cancel() {
27+
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
28+
.take(10)
29+
.test()
30+
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
31+
}
32+
}

src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalRangeTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,12 @@ public void longOverflow() {
7676
public void dispose() {
7777
TestHelper.checkDisposed(Observable.intervalRange(1, 2, 1, 1, TimeUnit.MILLISECONDS));
7878
}
79+
80+
@Test(timeout = 2000)
81+
public void cancel() {
82+
Observable.intervalRange(0, 20, 1, 1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
83+
.take(10)
84+
.test()
85+
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
86+
}
7987
}

src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@
1818
import org.junit.Test;
1919

2020
import io.reactivex.*;
21-
import io.reactivex.schedulers.TestScheduler;
21+
import io.reactivex.schedulers.*;
2222

2323
public class ObservableIntervalTest {
2424

2525
@Test
2626
public void dispose() {
2727
TestHelper.checkDisposed(Observable.interval(1, TimeUnit.MILLISECONDS, new TestScheduler()));
2828
}
29+
30+
@Test(timeout = 2000)
31+
public void cancel() {
32+
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
33+
.take(10)
34+
.test()
35+
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
36+
}
2937
}

0 commit comments

Comments
 (0)