Skip to content

Commit c5e1b3a

Browse files
JanKnakarnokd
authored andcommitted
Null check for BufferExactBoundedObserver (#6499)
* Null check for BufferExactBoundedObserver Other variants contain this Null check already, e.g. BufferExactUnboundedObserver It is causing 0.1% crashes in our production app. * ObservableBufferTimed.BufferExactBoundedObserver - failing supplier unit test FlowableBufferTimed.BufferExactBoundedSubscriber - failing supplier fix + unit test * Better Unit tests for FlowableBufferTimed and BufferExactBoundedSubscriber NPE issue Reverted
1 parent 89d506d commit c5e1b3a

File tree

4 files changed

+43
-10
lines changed

4 files changed

+43
-10
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -501,13 +501,14 @@ public void onComplete() {
501501
buffer = null;
502502
}
503503

504-
queue.offer(b);
505-
done = true;
506-
if (enter()) {
507-
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
504+
if (b != null) {
505+
queue.offer(b);
506+
done = true;
507+
if (enter()) {
508+
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
509+
}
510+
w.dispose();
508511
}
509-
510-
w.dispose();
511512
}
512513

513514
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,12 @@ public void onComplete() {
504504
buffer = null;
505505
}
506506

507-
queue.offer(b);
508-
done = true;
509-
if (enter()) {
510-
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
507+
if (b != null) {
508+
queue.offer(b);
509+
done = true;
510+
if (enter()) {
511+
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
512+
}
511513
}
512514
}
513515

src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,4 +2769,19 @@ public void timedSizeBufferAlreadyCleared() {
27692769

27702770
sub.run();
27712771
}
2772+
2773+
@Test
2774+
public void bufferExactFailingSupplier() {
2775+
Flowable.empty()
2776+
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
2777+
@Override
2778+
public List<Object> call() throws Exception {
2779+
throw new TestException();
2780+
}
2781+
}, false)
2782+
.test()
2783+
.awaitDone(1, TimeUnit.SECONDS)
2784+
.assertFailure(TestException.class)
2785+
;
2786+
}
27722787
}

src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2136,4 +2136,19 @@ public ObservableSource<List<Object>> apply(Observable<Object> o)
21362136
}
21372137
});
21382138
}
2139+
2140+
@Test
2141+
public void bufferExactFailingSupplier() {
2142+
Observable.empty()
2143+
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
2144+
@Override
2145+
public List<Object> call() throws Exception {
2146+
throw new TestException();
2147+
}
2148+
}, false)
2149+
.test()
2150+
.awaitDone(1, TimeUnit.SECONDS)
2151+
.assertFailure(TestException.class)
2152+
;
2153+
}
21392154
}

0 commit comments

Comments
 (0)