Skip to content

Commit 9b91d4e

Browse files
authored
2.x: fix timer() ISE due to bad resource mgmt (#4927)
1 parent 7d47932 commit 9b91d4e

File tree

5 files changed

+73
-10
lines changed

5 files changed

+73
-10
lines changed

src/main/java/io/reactivex/internal/disposables/DisposableHelper.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,21 @@ public enum DisposableHelper implements Disposable {
2929
DISPOSED
3030
;
3131

32+
/**
33+
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
34+
* @param d the disposable to check
35+
* @return true if d is {@link #DISPOSED}
36+
*/
3237
public static boolean isDisposed(Disposable d) {
3338
return d == DISPOSED;
3439
}
3540

41+
/**
42+
* Atomically sets the field and disposes the old contents.
43+
* @param field the target field
44+
* @param d the new Disposable to set
45+
* @return true if successful, false if the field contains the {@link #DISPOSED} instance.
46+
*/
3647
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
3748
for (;;) {
3849
Disposable current = field.get();
@@ -144,6 +155,23 @@ public static void reportDisposableSet() {
144155
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
145156
}
146157

158+
/**
159+
* Atomically tries to set the given Disposable on the field if it is null or disposes it if
160+
* the field contains {@link #DISPOSED}.
161+
* @param field the target field
162+
* @param d the disposable to set
163+
* @return true if successful, false otherwise
164+
*/
165+
public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
166+
if (!field.compareAndSet(null, d)) {
167+
if (field.get() == DISPOSED) {
168+
d.dispose();
169+
}
170+
return false;
171+
}
172+
return true;
173+
}
174+
147175
@Override
148176
public void dispose() {
149177
// deliberately no-op

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ public FlowableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
3636

3737
@Override
3838
public void subscribeActual(Subscriber<? super Long> s) {
39-
IntervalOnceSubscriber ios = new IntervalOnceSubscriber(s);
39+
TimerSubscriber ios = new TimerSubscriber(s);
4040
s.onSubscribe(ios);
4141

4242
Disposable d = scheduler.scheduleDirect(ios, delay, unit);
4343

4444
ios.setResource(d);
4545
}
4646

47-
static final class IntervalOnceSubscriber extends AtomicReference<Disposable>
47+
static final class TimerSubscriber extends AtomicReference<Disposable>
4848
implements Subscription, Runnable {
4949

5050
private static final long serialVersionUID = -2809475196591179431L;
@@ -53,7 +53,7 @@ static final class IntervalOnceSubscriber extends AtomicReference<Disposable>
5353

5454
volatile boolean requested;
5555

56-
IntervalOnceSubscriber(Subscriber<? super Long> actual) {
56+
TimerSubscriber(Subscriber<? super Long> actual) {
5757
this.actual = actual;
5858
}
5959

@@ -74,16 +74,17 @@ public void run() {
7474
if (get() != DisposableHelper.DISPOSED) {
7575
if (requested) {
7676
actual.onNext(0L);
77+
lazySet(EmptyDisposable.INSTANCE);
7778
actual.onComplete();
7879
} else {
80+
lazySet(EmptyDisposable.INSTANCE);
7981
actual.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
8082
}
81-
lazySet(EmptyDisposable.INSTANCE);
8283
}
8384
}
8485

8586
public void setResource(Disposable d) {
86-
DisposableHelper.setOnce(this, d);
87+
DisposableHelper.trySet(this, d);
8788
}
8889
}
8990
}

src/main/java/io/reactivex/internal/operators/observable/ObservableTimer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,22 @@ public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
3232

3333
@Override
3434
public void subscribeActual(Observer<? super Long> s) {
35-
IntervalOnceObserver ios = new IntervalOnceObserver(s);
35+
TimerObserver ios = new TimerObserver(s);
3636
s.onSubscribe(ios);
3737

3838
Disposable d = scheduler.scheduleDirect(ios, delay, unit);
3939

4040
ios.setResource(d);
4141
}
4242

43-
static final class IntervalOnceObserver extends AtomicReference<Disposable>
43+
static final class TimerObserver extends AtomicReference<Disposable>
4444
implements Disposable, Runnable {
4545

4646
private static final long serialVersionUID = -2809475196591179431L;
4747

4848
final Observer<? super Long> actual;
4949

50-
IntervalOnceObserver(Observer<? super Long> actual) {
50+
TimerObserver(Observer<? super Long> actual) {
5151
this.actual = actual;
5252
}
5353

@@ -65,13 +65,13 @@ public boolean isDisposed() {
6565
public void run() {
6666
if (!isDisposed()) {
6767
actual.onNext(0L);
68-
actual.onComplete();
6968
lazySet(EmptyDisposable.INSTANCE);
69+
actual.onComplete();
7070
}
7171
}
7272

7373
public void setResource(Disposable d) {
74-
DisposableHelper.setOnce(this, d);
74+
DisposableHelper.trySet(this, d);
7575
}
7676
}
7777
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import static org.junit.Assert.*;
1617
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

20+
import java.util.List;
1921
import java.util.concurrent.TimeUnit;
2022

2123
import org.junit.*;
@@ -25,6 +27,7 @@
2527
import io.reactivex.*;
2628
import io.reactivex.exceptions.*;
2729
import io.reactivex.flowables.ConnectableFlowable;
30+
import io.reactivex.plugins.RxJavaPlugins;
2831
import io.reactivex.schedulers.TestScheduler;
2932
import io.reactivex.subscribers.*;
3033

@@ -324,4 +327,18 @@ public void run() {
324327
TestHelper.race(r1, r2);
325328
}
326329
}
330+
331+
@Test
332+
public void timerDelayZero() {
333+
List<Throwable> errors = TestHelper.trackPluginErrors();
334+
try {
335+
for (int i = 0; i < 1000; i++) {
336+
Flowable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
337+
}
338+
339+
assertTrue(errors.toString(), errors.isEmpty());
340+
} finally {
341+
RxJavaPlugins.reset();
342+
}
343+
}
327344
}

src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16+
import static org.junit.Assert.assertTrue;
1617
import static org.mockito.Mockito.*;
1718

19+
import java.util.List;
1820
import java.util.concurrent.TimeUnit;
1921

2022
import org.junit.*;
@@ -24,6 +26,7 @@
2426
import io.reactivex.exceptions.TestException;
2527
import io.reactivex.observables.ConnectableObservable;
2628
import io.reactivex.observers.*;
29+
import io.reactivex.plugins.RxJavaPlugins;
2730
import io.reactivex.schedulers.TestScheduler;
2831

2932
public class ObservableTimerTest {
@@ -286,4 +289,18 @@ public void onComplete() {
286289
public void disposed() {
287290
TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS));
288291
}
292+
293+
@Test
294+
public void timerDelayZero() {
295+
List<Throwable> errors = TestHelper.trackPluginErrors();
296+
try {
297+
for (int i = 0; i < 1000; i++) {
298+
Observable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
299+
}
300+
301+
assertTrue(errors.toString(), errors.isEmpty());
302+
} finally {
303+
RxJavaPlugins.reset();
304+
}
305+
}
289306
}

0 commit comments

Comments
 (0)