Skip to content

Commit 5e3c6bd

Browse files
authored
2.x: fix timeout with fallback not cancelling the main source (#4945)
1 parent a9f1f4f commit 5e3c6bd

File tree

8 files changed

+152
-9
lines changed

8 files changed

+152
-9
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Subscrib
137137
final AtomicReference<MulticastSubscription<T>[]> subscribers;
138138

139139
final int prefetch;
140-
140+
141141
final int limit;
142142

143143
final boolean delayError;
@@ -150,7 +150,7 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Subscrib
150150

151151
volatile boolean done;
152152
Throwable error;
153-
153+
154154
int consumed;
155155

156156
@SuppressWarnings("unchecked")
@@ -319,11 +319,11 @@ void drain() {
319319
int missed = 1;
320320

321321
SimpleQueue<T> q = queue;
322-
322+
323323
int upstreamConsumed = consumed;
324324
int localLimit = limit;
325325
boolean canRequest = sourceMode != QueueSubscription.SYNC;
326-
326+
327327
for (;;) {
328328
MulticastSubscription<T>[] array = subscribers.get();
329329

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public void onComplete() {
174174
public void dispose() {
175175
worker.dispose();
176176
DisposableHelper.dispose(timer);
177+
s.cancel();
177178
}
178179

179180
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public void onComplete() {
174174
public void dispose() {
175175
worker.dispose();
176176
DisposableHelper.dispose(this);
177+
s.dispose();
177178
}
178179

179180
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
473473
public boolean test(Integer w) throws Exception {
474474
return w % 2 == 0;
475475
}
476-
}),
476+
}),
477477
v.filter(new Predicate<Integer>() {
478478
@Override
479479
public boolean test(Integer w) throws Exception {
@@ -500,7 +500,7 @@ public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
500500
public boolean test(Integer w) throws Exception {
501501
return w % 2 == 0;
502502
}
503-
}),
503+
}),
504504
v.filter(new Predicate<Integer>() {
505505
@Override
506506
public boolean test(Integer w) throws Exception {
@@ -528,7 +528,7 @@ public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
528528
public boolean test(Integer w) throws Exception {
529529
return w % 2 == 0;
530530
}
531-
}),
531+
}),
532532
v.filter(new Predicate<Integer>() {
533533
@Override
534534
public boolean test(Integer w) throws Exception {

src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,4 +483,39 @@ protected void subscribeActual(Subscriber<? super Integer> observer) {
483483
}
484484
}
485485

486+
487+
@Test
488+
public void timedTake() {
489+
PublishProcessor<Integer> ps = PublishProcessor.create();
490+
491+
TestSubscriber<Integer> to = ps.timeout(1, TimeUnit.DAYS)
492+
.take(1)
493+
.test();
494+
495+
assertTrue(ps.hasSubscribers());
496+
497+
ps.onNext(1);
498+
499+
assertFalse(ps.hasSubscribers());
500+
501+
to.assertResult(1);
502+
}
503+
504+
@Test
505+
public void timedFallbackTake() {
506+
PublishProcessor<Integer> ps = PublishProcessor.create();
507+
508+
TestSubscriber<Integer> to = ps.timeout(1, TimeUnit.DAYS, Flowable.just(2))
509+
.take(1)
510+
.test();
511+
512+
assertTrue(ps.hasSubscribers());
513+
514+
ps.onNext(1);
515+
516+
assertFalse(ps.hasSubscribers());
517+
518+
to.assertResult(1);
519+
}
520+
486521
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import static org.junit.Assert.assertFalse;
16+
import static org.junit.Assert.*;
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

@@ -509,4 +509,40 @@ protected void subscribeActual(Subscriber<? super Integer> observer) {
509509
.test()
510510
.assertResult(1);
511511
}
512+
513+
@Test
514+
public void selectorTake() {
515+
PublishProcessor<Integer> ps = PublishProcessor.create();
516+
517+
TestSubscriber<Integer> to = ps
518+
.timeout(Functions.justFunction(Flowable.never()))
519+
.take(1)
520+
.test();
521+
522+
assertTrue(ps.hasSubscribers());
523+
524+
ps.onNext(1);
525+
526+
assertFalse(ps.hasSubscribers());
527+
528+
to.assertResult(1);
529+
}
530+
531+
@Test
532+
public void selectorFallbackTake() {
533+
PublishProcessor<Integer> ps = PublishProcessor.create();
534+
535+
TestSubscriber<Integer> to = ps
536+
.timeout(Functions.justFunction(Flowable.never()), Flowable.just(2))
537+
.take(1)
538+
.test();
539+
540+
assertTrue(ps.hasSubscribers());
541+
542+
ps.onNext(1);
543+
544+
assertFalse(ps.hasSubscribers());
545+
546+
to.assertResult(1);
547+
}
512548
}

src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,4 +481,38 @@ protected void subscribeActual(Observer<? super Integer> observer) {
481481
RxJavaPlugins.reset();
482482
}
483483
}
484+
485+
@Test
486+
public void timedTake() {
487+
PublishSubject<Integer> ps = PublishSubject.create();
488+
489+
TestObserver<Integer> to = ps.timeout(1, TimeUnit.DAYS)
490+
.take(1)
491+
.test();
492+
493+
assertTrue(ps.hasObservers());
494+
495+
ps.onNext(1);
496+
497+
assertFalse(ps.hasObservers());
498+
499+
to.assertResult(1);
500+
}
501+
502+
@Test
503+
public void timedFallbackTake() {
504+
PublishSubject<Integer> ps = PublishSubject.create();
505+
506+
TestObserver<Integer> to = ps.timeout(1, TimeUnit.DAYS, Observable.just(2))
507+
.take(1)
508+
.test();
509+
510+
assertTrue(ps.hasObservers());
511+
512+
ps.onNext(1);
513+
514+
assertFalse(ps.hasObservers());
515+
516+
to.assertResult(1);
517+
}
484518
}

src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import static org.junit.Assert.assertFalse;
16+
import static org.junit.Assert.*;
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

@@ -510,4 +510,40 @@ protected void subscribeActual(Observer<? super Integer> observer) {
510510
.test()
511511
.assertResult(1);
512512
}
513+
514+
@Test
515+
public void selectorTake() {
516+
PublishSubject<Integer> ps = PublishSubject.create();
517+
518+
TestObserver<Integer> to = ps
519+
.timeout(Functions.justFunction(Observable.never()))
520+
.take(1)
521+
.test();
522+
523+
assertTrue(ps.hasObservers());
524+
525+
ps.onNext(1);
526+
527+
assertFalse(ps.hasObservers());
528+
529+
to.assertResult(1);
530+
}
531+
532+
@Test
533+
public void selectorFallbackTake() {
534+
PublishSubject<Integer> ps = PublishSubject.create();
535+
536+
TestObserver<Integer> to = ps
537+
.timeout(Functions.justFunction(Observable.never()), Observable.just(2))
538+
.take(1)
539+
.test();
540+
541+
assertTrue(ps.hasObservers());
542+
543+
ps.onNext(1);
544+
545+
assertFalse(ps.hasObservers());
546+
547+
to.assertResult(1);
548+
}
513549
}

0 commit comments

Comments
 (0)