Skip to content

Commit a414ea0

Browse files
bert-beyondloopsBert Vermeiren
andauthored
Issue 19781 : Internal error: Assertion failed: !self.finished: LimitedBatchCoalescer (#19785)
## Which issue does this PR close? PR will close issue #19781. ## Rationale for this change Fixes the internal error ## What changes are included in this PR? The code change is inspired by the `CoalesceBatchesStream` implementation. ## Are these changes tested? Additional sqllogictest written in limit.slt which triggered the issue before the fix. ## Are there any user-facing changes? No --------- Co-authored-by: Bert Vermeiren <[email protected]>
1 parent eadbed5 commit a414ea0

File tree

4 files changed

+55
-46
lines changed

4 files changed

+55
-46
lines changed

datafusion/physical-plan/src/coalesce/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ impl LimitedBatchCoalescer {
134134
Ok(())
135135
}
136136

137+
pub(crate) fn is_finished(&self) -> bool {
138+
self.finished
139+
}
140+
137141
/// Return the next completed batch, if any
138142
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
139143
self.inner.next_completed_batch()

datafusion/physical-plan/src/filter.rs

Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ use super::{
2626
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
2727
RecordBatchStream, SendableRecordBatchStream, Statistics,
2828
};
29-
use crate::coalesce::LimitedBatchCoalescer;
30-
use crate::coalesce::PushBatchStatus::LimitReached;
29+
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
3130
use crate::common::can_project;
3231
use crate::execution_plan::CardinalityEffect;
3332
use crate::filter_pushdown::{
@@ -711,23 +710,6 @@ impl FilterExecMetrics {
711710
}
712711
}
713712

714-
impl FilterExecStream {
715-
fn flush_remaining_batches(
716-
&mut self,
717-
) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
718-
// Flush any remaining buffered batch
719-
match self.batch_coalescer.finish() {
720-
Ok(()) => {
721-
Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
722-
self.metrics.selectivity.add_part(batch.num_rows());
723-
Ok(batch)
724-
}))
725-
}
726-
Err(e) => Poll::Ready(Some(Err(e))),
727-
}
728-
}
729-
}
730-
731713
pub fn batch_filter(
732714
batch: &RecordBatch,
733715
predicate: &Arc<dyn PhysicalExpr>,
@@ -767,10 +749,26 @@ impl Stream for FilterExecStream {
767749
mut self: Pin<&mut Self>,
768750
cx: &mut Context<'_>,
769751
) -> Poll<Option<Self::Item>> {
770-
let poll;
771752
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
772753
loop {
754+
// If there is a completed batch ready, return it
755+
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
756+
self.metrics.selectivity.add_part(batch.num_rows());
757+
let poll = Poll::Ready(Some(Ok(batch)));
758+
return self.metrics.baseline_metrics.record_poll(poll);
759+
}
760+
761+
if self.batch_coalescer.is_finished() {
762+
// If input is done and no batches are ready, return None to signal end of stream.
763+
return Poll::Ready(None);
764+
}
765+
766+
// Attempt to pull the next batch from the input stream.
773767
match ready!(self.input.poll_next_unpin(cx)) {
768+
None => {
769+
self.batch_coalescer.finish()?;
770+
// continue draining the coalescer
771+
}
774772
Some(Ok(batch)) => {
775773
let timer = elapsed_compute.timer();
776774
let status = self.predicate.as_ref()
@@ -802,37 +800,22 @@ impl Stream for FilterExecStream {
802800
})?;
803801
timer.done();
804802

805-
if let LimitReached = status {
806-
poll = self.flush_remaining_batches();
807-
break;
808-
}
809-
810-
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
811-
self.metrics.selectivity.add_part(batch.num_rows());
812-
poll = Poll::Ready(Some(Ok(batch)));
813-
break;
814-
}
815-
continue;
816-
}
817-
None => {
818-
// Flush any remaining buffered batch
819-
match self.batch_coalescer.finish() {
820-
Ok(()) => {
821-
poll = self.flush_remaining_batches();
803+
match status {
804+
PushBatchStatus::Continue => {
805+
// Keep pushing more batches
822806
}
823-
Err(e) => {
824-
poll = Poll::Ready(Some(Err(e)));
807+
PushBatchStatus::LimitReached => {
808+
// limit was reached, so stop early
809+
self.batch_coalescer.finish()?;
810+
// continue draining the coalescer
825811
}
826812
}
827-
break;
828-
}
829-
value => {
830-
poll = Poll::Ready(value);
831-
break;
832813
}
814+
815+
// Error case
816+
other => return Poll::Ready(other),
833817
}
834818
}
835-
self.metrics.baseline_metrics.record_poll(poll)
836819
}
837820

838821
fn size_hint(&self) -> (usize, Option<usize>) {

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,4 +871,4 @@ DROP TABLE test_limit_with_partitions;
871871

872872
# Tear down src_table table:
873873
statement ok
874-
DROP TABLE src_table;
874+
DROP TABLE src_table;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
2+
# minimize batch size to 1 in order to trigger different code paths
3+
statement ok
4+
set datafusion.execution.batch_size = '1';
5+
6+
# ----
7+
# tests with target partition set to 1
8+
# ----
9+
statement ok
10+
set datafusion.execution.target_partitions = '1';
11+
12+
13+
statement ok
14+
CREATE TABLE filter_limit (i INT) as values (1), (2);
15+
16+
query I
17+
SELECT COUNT(*) FROM (SELECT i FROM filter_limit WHERE i <> 0 LIMIT 1);
18+
----
19+
1
20+
21+
statement ok
22+
DROP TABLE filter_limit;

0 commit comments

Comments
 (0)