Skip to content

Commit 07e126f

Browse files
authored
2.x: Fix Single.takeUntil, Maybe.takeUntil dispose behavior (#6019)
1 parent b9f5ef8 commit 07e126f

File tree

7 files changed

+880
-1
lines changed

7 files changed

+880
-1
lines changed

src/main/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public void onSubscribe(Subscription s) {
137137

138138
@Override
139139
public void onNext(Object value) {
140+
SubscriptionHelper.cancel(this);
140141
parent.otherComplete();
141142
}
142143

src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static final class TakeUntilMainObserver<T>
6969
@Override
7070
public void dispose() {
7171
DisposableHelper.dispose(this);
72+
other.dispose();
7273
}
7374

7475
@Override

src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,78 @@ public void ambRace() {
186186
RxJavaPlugins.reset();
187187
}
188188
}
189+
190+
191+
@Test
192+
public void untilCompletableMainComplete() {
193+
CompletableSubject main = CompletableSubject.create();
194+
CompletableSubject other = CompletableSubject.create();
195+
196+
TestObserver<Void> to = main.ambWith(other).test();
197+
198+
assertTrue("Main no observers?", main.hasObservers());
199+
assertTrue("Other no observers?", other.hasObservers());
200+
201+
main.onComplete();
202+
203+
assertFalse("Main has observers?", main.hasObservers());
204+
assertFalse("Other has observers?", other.hasObservers());
205+
206+
to.assertResult();
207+
}
208+
209+
@Test
210+
public void untilCompletableMainError() {
211+
CompletableSubject main = CompletableSubject.create();
212+
CompletableSubject other = CompletableSubject.create();
213+
214+
TestObserver<Void> to = main.ambWith(other).test();
215+
216+
assertTrue("Main no observers?", main.hasObservers());
217+
assertTrue("Other no observers?", other.hasObservers());
218+
219+
main.onError(new TestException());
220+
221+
assertFalse("Main has observers?", main.hasObservers());
222+
assertFalse("Other has observers?", other.hasObservers());
223+
224+
to.assertFailure(TestException.class);
225+
}
226+
227+
@Test
228+
public void untilCompletableOtherOnComplete() {
229+
CompletableSubject main = CompletableSubject.create();
230+
CompletableSubject other = CompletableSubject.create();
231+
232+
TestObserver<Void> to = main.ambWith(other).test();
233+
234+
assertTrue("Main no observers?", main.hasObservers());
235+
assertTrue("Other no observers?", other.hasObservers());
236+
237+
other.onComplete();
238+
239+
assertFalse("Main has observers?", main.hasObservers());
240+
assertFalse("Other has observers?", other.hasObservers());
241+
242+
to.assertResult();
243+
}
244+
245+
@Test
246+
public void untilCompletableOtherError() {
247+
CompletableSubject main = CompletableSubject.create();
248+
CompletableSubject other = CompletableSubject.create();
249+
250+
TestObserver<Void> to = main.ambWith(other).test();
251+
252+
assertTrue("Main no observers?", main.hasObservers());
253+
assertTrue("Other no observers?", other.hasObservers());
254+
255+
other.onError(new TestException());
256+
257+
assertFalse("Main has observers?", main.hasObservers());
258+
assertFalse("Other has observers?", other.hasObservers());
259+
260+
to.assertFailure(TestException.class);
261+
}
262+
189263
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.reactivestreams.*;
2121

2222
import io.reactivex.*;
23+
import io.reactivex.exceptions.TestException;
2324
import io.reactivex.functions.Function;
2425
import io.reactivex.processors.PublishProcessor;
2526
import io.reactivex.subscribers.TestSubscriber;
@@ -293,4 +294,133 @@ public Flowable<Integer> apply(Flowable<Integer> c) throws Exception {
293294
}
294295
});
295296
}
297+
298+
@Test
299+
public void untilPublisherMainSuccess() {
300+
PublishProcessor<Integer> main = PublishProcessor.create();
301+
PublishProcessor<Integer> other = PublishProcessor.create();
302+
303+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
304+
305+
assertTrue("Main no subscribers?", main.hasSubscribers());
306+
assertTrue("Other no subscribers?", other.hasSubscribers());
307+
308+
main.onNext(1);
309+
main.onNext(2);
310+
main.onComplete();
311+
312+
assertFalse("Main has subscribers?", main.hasSubscribers());
313+
assertFalse("Other has subscribers?", other.hasSubscribers());
314+
315+
ts.assertResult(1, 2);
316+
}
317+
318+
@Test
319+
public void untilPublisherMainComplete() {
320+
PublishProcessor<Integer> main = PublishProcessor.create();
321+
PublishProcessor<Integer> other = PublishProcessor.create();
322+
323+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
324+
325+
assertTrue("Main no subscribers?", main.hasSubscribers());
326+
assertTrue("Other no subscribers?", other.hasSubscribers());
327+
328+
main.onComplete();
329+
330+
assertFalse("Main has subscribers?", main.hasSubscribers());
331+
assertFalse("Other has subscribers?", other.hasSubscribers());
332+
333+
ts.assertResult();
334+
}
335+
336+
@Test
337+
public void untilPublisherMainError() {
338+
PublishProcessor<Integer> main = PublishProcessor.create();
339+
PublishProcessor<Integer> other = PublishProcessor.create();
340+
341+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
342+
343+
assertTrue("Main no subscribers?", main.hasSubscribers());
344+
assertTrue("Other no subscribers?", other.hasSubscribers());
345+
346+
main.onError(new TestException());
347+
348+
assertFalse("Main has subscribers?", main.hasSubscribers());
349+
assertFalse("Other has subscribers?", other.hasSubscribers());
350+
351+
ts.assertFailure(TestException.class);
352+
}
353+
354+
@Test
355+
public void untilPublisherOtherOnNext() {
356+
PublishProcessor<Integer> main = PublishProcessor.create();
357+
PublishProcessor<Integer> other = PublishProcessor.create();
358+
359+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
360+
361+
assertTrue("Main no subscribers?", main.hasSubscribers());
362+
assertTrue("Other no subscribers?", other.hasSubscribers());
363+
364+
other.onNext(1);
365+
366+
assertFalse("Main has subscribers?", main.hasSubscribers());
367+
assertFalse("Other has subscribers?", other.hasSubscribers());
368+
369+
ts.assertResult();
370+
}
371+
372+
@Test
373+
public void untilPublisherOtherOnComplete() {
374+
PublishProcessor<Integer> main = PublishProcessor.create();
375+
PublishProcessor<Integer> other = PublishProcessor.create();
376+
377+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
378+
379+
assertTrue("Main no subscribers?", main.hasSubscribers());
380+
assertTrue("Other no subscribers?", other.hasSubscribers());
381+
382+
other.onComplete();
383+
384+
assertFalse("Main has subscribers?", main.hasSubscribers());
385+
assertFalse("Other has subscribers?", other.hasSubscribers());
386+
387+
ts.assertResult();
388+
}
389+
390+
@Test
391+
public void untilPublisherOtherError() {
392+
PublishProcessor<Integer> main = PublishProcessor.create();
393+
PublishProcessor<Integer> other = PublishProcessor.create();
394+
395+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
396+
397+
assertTrue("Main no subscribers?", main.hasSubscribers());
398+
assertTrue("Other no subscribers?", other.hasSubscribers());
399+
400+
other.onError(new TestException());
401+
402+
assertFalse("Main has subscribers?", main.hasSubscribers());
403+
assertFalse("Other has subscribers?", other.hasSubscribers());
404+
405+
ts.assertFailure(TestException.class);
406+
}
407+
408+
@Test
409+
public void untilPublisherDispose() {
410+
PublishProcessor<Integer> main = PublishProcessor.create();
411+
PublishProcessor<Integer> other = PublishProcessor.create();
412+
413+
TestSubscriber<Integer> ts = main.takeUntil(other).test();
414+
415+
assertTrue("Main no subscribers?", main.hasSubscribers());
416+
assertTrue("Other no subscribers?", other.hasSubscribers());
417+
418+
ts.dispose();
419+
420+
assertFalse("Main has subscribers?", main.hasSubscribers());
421+
assertFalse("Other has subscribers?", other.hasSubscribers());
422+
423+
ts.assertEmpty();
424+
}
425+
296426
}

0 commit comments

Comments
 (0)