Skip to content

Commit b0520da

Browse files
authored
2.x: fix window-timed test leaking an MBE that affects unrelated tests (#4755)
1 parent 70d4407 commit b0520da

File tree

4 files changed

+17
-7
lines changed

4 files changed

+17
-7
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
import org.reactivestreams.*;
2020

21-
import io.reactivex.disposables.Disposable;
2221
import io.reactivex.exceptions.Exceptions;
2322
import io.reactivex.functions.Function;
2423
import io.reactivex.internal.functions.ObjectHelper;
25-
import io.reactivex.internal.fuseable.*;
24+
import io.reactivex.internal.fuseable.QueueFuseable;
2625
import io.reactivex.internal.subscribers.BasicFuseableSubscriber;
2726
import io.reactivex.internal.subscriptions.EmptySubscription;
2827
import io.reactivex.plugins.RxJavaPlugins;

src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717
import java.util.concurrent.Callable;
1818

1919
import io.reactivex.*;
20-
import io.reactivex.disposables.Disposable;
2120
import io.reactivex.exceptions.Exceptions;
2221
import io.reactivex.functions.Function;
2322
import io.reactivex.internal.disposables.EmptyDisposable;
2423
import io.reactivex.internal.functions.ObjectHelper;
25-
import io.reactivex.internal.fuseable.SimpleQueue;
2624
import io.reactivex.internal.observers.BasicFuseableObserver;
2725
import io.reactivex.plugins.RxJavaPlugins;
2826

src/main/java/io/reactivex/internal/operators/observable/ObservableFilter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import io.reactivex.*;
17-
import io.reactivex.disposables.Disposable;
1817
import io.reactivex.functions.Predicate;
1918
import io.reactivex.internal.observers.BasicFuseableObserver;
2019

src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import java.util.*;
1919
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.*;
2121

2222
import org.junit.*;
2323
import org.reactivestreams.*;
@@ -528,8 +528,15 @@ public void exactBoundaryError() {
528528
}
529529

530530
@Test
531-
public void restartTimerMany() {
531+
public void restartTimerMany() throws Exception {
532+
final AtomicBoolean cancel1 = new AtomicBoolean();
532533
Flowable.intervalRange(1, 1000, 1, 1, TimeUnit.MILLISECONDS)
534+
.doOnCancel(new Action() {
535+
@Override
536+
public void run() throws Exception {
537+
cancel1.set(true);
538+
}
539+
})
533540
.window(1, TimeUnit.MILLISECONDS, Schedulers.single(), 2, true)
534541
.flatMap(Functions.<Flowable<Long>>identity())
535542
.take(500)
@@ -539,6 +546,13 @@ public void restartTimerMany() {
539546
.assertValueCount(500)
540547
.assertNoErrors()
541548
.assertComplete();
549+
550+
int timeout = 20;
551+
while (timeout-- > 0 && !cancel1.get()) {
552+
Thread.sleep(100);
553+
}
554+
555+
assertTrue("intervalRange was not cancelled!", cancel1.get());
542556
}
543557

544558
@Test

0 commit comments

Comments
 (0)