|
13 | 13 |
|
14 | 14 | package io.reactivex.internal.operators.flowable;
|
15 | 15 |
|
| 16 | +import static org.junit.Assert.assertFalse; |
16 | 17 | import static org.mockito.ArgumentMatchers.any;
|
17 | 18 | import static org.mockito.Mockito.*;
|
18 | 19 |
|
@@ -306,11 +307,20 @@ public void backpressureOverflow() {
|
306 | 307 |
|
307 | 308 | @Test
|
308 | 309 | public void backpressureOverflowWithOtherPublisher() {
|
309 |
| - BehaviorProcessor.createDefault(1) |
310 |
| - .sample(Flowable.timer(1, TimeUnit.MILLISECONDS)) |
311 |
| - .test(0L) |
312 |
| - .awaitDone(5, TimeUnit.SECONDS) |
313 |
| - .assertFailure(MissingBackpressureException.class); |
| 310 | + PublishProcessor<Integer> pp1 = PublishProcessor.create(); |
| 311 | + PublishProcessor<Integer> pp2 = PublishProcessor.create(); |
| 312 | + |
| 313 | + TestSubscriber<Integer> ts = pp1 |
| 314 | + .sample(pp2) |
| 315 | + .test(0L); |
| 316 | + |
| 317 | + pp1.onNext(1); |
| 318 | + pp2.onNext(2); |
| 319 | + |
| 320 | + ts.assertFailure(MissingBackpressureException.class); |
| 321 | + |
| 322 | + assertFalse(pp1.hasSubscribers()); |
| 323 | + assertFalse(pp2.hasSubscribers()); |
314 | 324 | }
|
315 | 325 |
|
316 | 326 | @Test
|
@@ -455,5 +465,19 @@ public Flowable<Object> apply(Flowable<Object> f)
|
455 | 465 | return f.sample(1, TimeUnit.SECONDS);
|
456 | 466 | }
|
457 | 467 | });
|
| 468 | + |
| 469 | + TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Flowable<Object>>() { |
| 470 | + @Override |
| 471 | + public Flowable<Object> apply(Flowable<Object> f) |
| 472 | + throws Exception { |
| 473 | + return f.sample(PublishProcessor.create()); |
| 474 | + } |
| 475 | + }); |
| 476 | + } |
| 477 | + |
| 478 | + @Test |
| 479 | + public void badRequest() { |
| 480 | + TestHelper.assertBadRequestReported(PublishProcessor.create() |
| 481 | + .sample(PublishProcessor.create())); |
458 | 482 | }
|
459 | 483 | }
|
0 commit comments