Skip to content

Commit f35489c

Browse files
Fix Merge Bug
If the queue being drained did not complete due to the backpressure request being fulfilled, it could onComplete before draining the queue and miss values.
1 parent 34aaf0b commit f35489c

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,9 @@ private void emit(T t, boolean complete) {
535535
emitted++;
536536
}
537537
} else {
538-
if (producer.requested > 0) {
538+
// this needs to check q.count() as draining above may not have drained the full queue
539+
// perf tests show this to be okay, though different queue implementations could perform poorly with this
540+
if (producer.requested > 0 && q.count() == 0) {
539541
if (complete) {
540542
parentSubscriber.completeInner(this);
541543
} else {

0 commit comments

Comments
 (0)