Skip to content

Commit 3c46285

Browse files
committed
The backpressure exception might cut ahead of all onNext events.
1 parent 061cdb6 commit 3c46285

File tree

1 file changed

+41
-38
lines changed

1 file changed

+41
-38
lines changed

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -609,47 +609,50 @@ public void testAsyncChild() {
609609

610610
@Test
611611
public void testOnErrorCutsAheadOfOnNext() {
612-
final PublishSubject<Long> subject = PublishSubject.create();
613-
614-
final AtomicLong counter = new AtomicLong();
615-
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {
616-
617-
@Override
618-
public void onCompleted() {
619-
620-
}
621-
622-
@Override
623-
public void onError(Throwable e) {
624-
625-
}
626-
627-
@Override
628-
public void onNext(Long t) {
629-
// simulate slow consumer to force backpressure failure
630-
try {
631-
Thread.sleep(1);
632-
} catch (InterruptedException e) {
612+
for (int i = 0; i < 50; i++) {
613+
final PublishSubject<Long> subject = PublishSubject.create();
614+
615+
final AtomicLong counter = new AtomicLong();
616+
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {
617+
618+
@Override
619+
public void onCompleted() {
620+
621+
}
622+
623+
@Override
624+
public void onError(Throwable e) {
625+
626+
}
627+
628+
@Override
629+
public void onNext(Long t) {
630+
// simulate slow consumer to force backpressure failure
631+
try {
632+
Thread.sleep(1);
633+
} catch (InterruptedException e) {
634+
}
633635
}
636+
637+
});
638+
subject.observeOn(Schedulers.computation()).subscribe(ts);
639+
640+
// this will blow up with backpressure
641+
while (counter.get() < 102400) {
642+
subject.onNext(counter.get());
643+
counter.incrementAndGet();
634644
}
635-
636-
});
637-
subject.observeOn(Schedulers.computation()).subscribe(ts);
638-
639-
// this will blow up with backpressure
640-
while (counter.get() < 102400) {
641-
subject.onNext(counter.get());
642-
counter.incrementAndGet();
645+
646+
ts.awaitTerminalEvent();
647+
assertEquals(1, ts.getOnErrorEvents().size());
648+
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
649+
// assert that the values are sequential, that cutting in didn't allow skipping some but emitting others.
650+
// example [0, 1, 2] not [0, 1, 4]
651+
List<Long> onNextEvents = ts.getOnNextEvents();
652+
assertTrue(onNextEvents.isEmpty() || onNextEvents.size() == onNextEvents.get(onNextEvents.size() - 1) + 1);
653+
// we should emit the error without emitting the full buffer size
654+
assertTrue(onNextEvents.size() < RxRingBuffer.SIZE);
643655
}
644-
645-
ts.awaitTerminalEvent();
646-
assertEquals(1, ts.getOnErrorEvents().size());
647-
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
648-
// assert that the values are sequential, that cutting in didn't allow skipping some but emitting others.
649-
// example [0, 1, 2] not [0, 1, 4]
650-
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
651-
// we should emit the error without emitting the full buffer size
652-
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
653656
}
654657

655658
/**

0 commit comments

Comments
 (0)