Skip to content

Commit 25e7b2c

Browse files
committed
fix error handling in onBackpressureBuffer
1 parent 64956e7 commit 25e7b2c

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observable.Operator;
2323
import rx.Producer;
2424
import rx.Subscriber;
25+
import rx.exceptions.Exceptions;
2526
import rx.exceptions.MissingBackpressureException;
2627
import rx.functions.Action0;
2728
import rx.internal.util.BackpressureDrainManager;
@@ -156,7 +157,15 @@ private boolean assertCapacity() {
156157
"Overflowed buffer of "
157158
+ baseCapacity));
158159
if (onOverflow != null) {
159-
onOverflow.call();
160+
try {
161+
onOverflow.call();
162+
} catch (Throwable e) {
163+
Exceptions.throwIfFatal(e);
164+
manager.terminateAndDrain(e);
165+
// this line not strictly necessary but nice for clarity
166+
// and in case of future changes to code after this catch block
167+
return false;
168+
}
160169
}
161170
}
162171
return false;

src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertTrue;
21+
1822
import java.util.concurrent.CountDownLatch;
1923
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2025

2126
import org.junit.Test;
27+
import org.mockito.Mock;
28+
import org.mockito.Mockito;
2229

2330
import rx.Observable;
2431
import rx.Observable.OnSubscribe;
@@ -27,12 +34,10 @@
2734
import rx.Subscription;
2835
import rx.exceptions.MissingBackpressureException;
2936
import rx.functions.Action0;
37+
import rx.functions.Action1;
3038
import rx.observers.TestSubscriber;
3139
import rx.schedulers.Schedulers;
3240

33-
import static org.junit.Assert.assertEquals;
34-
import static org.junit.Assert.assertTrue;
35-
3641
public class OperatorOnBackpressureBufferTest {
3742

3843
@Test
@@ -147,5 +152,30 @@ public void call(Subscriber<? super Long> s) {
147152
}
148153

149154
});
155+
156+
private static final Action0 THROWS_NON_FATAL = new Action0() {
157+
158+
@Override
159+
public void call() {
160+
throw new RuntimeException();
161+
}};
162+
163+
@Test
164+
public void testNonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {
165+
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
166+
TestSubscriber<Long> ts = TestSubscriber.create(0);
167+
infinite
168+
.subscribeOn(Schedulers.computation())
169+
.doOnError(new Action1<Throwable>() {
170+
@Override
171+
public void call(Throwable t) {
172+
errorOccurred.set(true);
173+
}
174+
})
175+
.onBackpressureBuffer(1, THROWS_NON_FATAL)
176+
.subscribe(ts);
177+
ts.awaitTerminalEvent();
178+
assertFalse(errorOccurred.get());
179+
}
150180

151181
}

0 commit comments

Comments
 (0)