Skip to content

Commit be85bf4

Browse files
authored
CoalescePartitionsExec fetch is not consistent with one partition and more than one partition (#18245)
## Which issue does this PR close? - Closes [#18244](#18244) ## Rationale for this change In our internal project, the limit will not return right number when CoalescePartitionsExec follow up by our customer operator which is only one partition output. After my investigation i found: CoalescePartitionsExec fetch is not consistent with one partition and more than one partition. ## What changes are included in this PR? Make CoalescePartitionsExec fetch should be consistent when the partition number changes. ## Are these changes tested? Yes ## Are there any user-facing changes? No
1 parent 92c5607 commit be85bf4

File tree

1 file changed

+118
-2
lines changed

1 file changed

+118
-2
lines changed

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,18 @@ impl ExecutionPlan for CoalescePartitionsExec {
170170
"CoalescePartitionsExec requires at least one input partition"
171171
),
172172
1 => {
173-
// bypass any threading / metrics if there is a single partition
174-
self.input.execute(0, context)
173+
// single-partition path: execute child directly, but ensure fetch is respected
174+
// (wrap with ObservedStream only if fetch is present so we don't add overhead otherwise)
175+
let child_stream = self.input.execute(0, context)?;
176+
if self.fetch.is_some() {
177+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
178+
return Ok(Box::pin(ObservedStream::new(
179+
child_stream,
180+
baseline_metrics,
181+
self.fetch,
182+
)));
183+
}
184+
Ok(child_stream)
175185
}
176186
_ => {
177187
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -351,4 +361,110 @@ mod tests {
351361

352362
collect(coalesce_partitions_exec, task_ctx).await.unwrap();
353363
}
364+
365+
#[tokio::test]
366+
async fn test_single_partition_with_fetch() -> Result<()> {
367+
let task_ctx = Arc::new(TaskContext::default());
368+
369+
// Use existing scan_partitioned with 1 partition (returns 100 rows per partition)
370+
let input = test::scan_partitioned(1);
371+
372+
// Test with fetch=3
373+
let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
374+
375+
let stream = coalesce.execute(0, task_ctx)?;
376+
let batches = common::collect(stream).await?;
377+
378+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
379+
assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
380+
381+
Ok(())
382+
}
383+
384+
#[tokio::test]
385+
async fn test_multi_partition_with_fetch_one() -> Result<()> {
386+
let task_ctx = Arc::new(TaskContext::default());
387+
388+
// Create 4 partitions, each with 100 rows
389+
// This simulates the real-world scenario where each partition has data
390+
let input = test::scan_partitioned(4);
391+
392+
// Test with fetch=1 (the original bug: was returning multiple rows instead of 1)
393+
let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
394+
395+
let stream = coalesce.execute(0, task_ctx)?;
396+
let batches = common::collect(stream).await?;
397+
398+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
399+
assert_eq!(
400+
row_count, 1,
401+
"Should only return 1 row due to fetch=1, not one per partition"
402+
);
403+
404+
Ok(())
405+
}
406+
407+
#[tokio::test]
408+
async fn test_single_partition_without_fetch() -> Result<()> {
409+
let task_ctx = Arc::new(TaskContext::default());
410+
411+
// Use scan_partitioned with 1 partition
412+
let input = test::scan_partitioned(1);
413+
414+
// Test without fetch (should return all rows)
415+
let coalesce = CoalescePartitionsExec::new(input);
416+
417+
let stream = coalesce.execute(0, task_ctx)?;
418+
let batches = common::collect(stream).await?;
419+
420+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
421+
assert_eq!(
422+
row_count, 100,
423+
"Should return all 100 rows when fetch is None"
424+
);
425+
426+
Ok(())
427+
}
428+
429+
#[tokio::test]
430+
async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
431+
let task_ctx = Arc::new(TaskContext::default());
432+
433+
// Use scan_partitioned with 1 partition (returns 100 rows)
434+
let input = test::scan_partitioned(1);
435+
436+
// Test with fetch larger than available rows
437+
let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
438+
439+
let stream = coalesce.execute(0, task_ctx)?;
440+
let batches = common::collect(stream).await?;
441+
442+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
443+
assert_eq!(
444+
row_count, 100,
445+
"Should return all available rows (100) when fetch (200) is larger"
446+
);
447+
448+
Ok(())
449+
}
450+
451+
#[tokio::test]
452+
async fn test_multi_partition_fetch_exact_match() -> Result<()> {
453+
let task_ctx = Arc::new(TaskContext::default());
454+
455+
// Create 4 partitions, each with 100 rows
456+
let num_partitions = 4;
457+
let csv = test::scan_partitioned(num_partitions);
458+
459+
// Test with fetch=400 (exactly all rows)
460+
let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
461+
462+
let stream = coalesce.execute(0, task_ctx)?;
463+
let batches = common::collect(stream).await?;
464+
465+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
466+
assert_eq!(row_count, 400, "Should return exactly 400 rows");
467+
468+
Ok(())
469+
}
354470
}

0 commit comments

Comments
 (0)