Skip to content

Commit 6368daf

Browse files
authored
Implement partition_statistics API for RepartitionExec (apache#17061)
* Implement `partition_statistics` API for `RepartitionExec` * Test execution * Change the partition number for the test * Make all column stats absent * Use `ColumnStatistics::new_unknown()` * Add test case for 0 partitions * Return unknown statistics for 0 partitions
1 parent fd7df66 commit 6368daf

File tree

2 files changed

+141
-4
lines changed

2 files changed

+141
-4
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod test {
3333
use datafusion_functions_aggregate::count::count_udaf;
3434
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
3535
use datafusion_physical_expr::expressions::{binary, col, lit, Column};
36+
use datafusion_physical_expr::Partitioning;
3637
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3738
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3839
use datafusion_physical_plan::aggregates::{
@@ -47,6 +48,7 @@ mod test {
4748
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4849
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
4950
use datafusion_physical_plan::projection::ProjectionExec;
51+
use datafusion_physical_plan::repartition::RepartitionExec;
5052
use datafusion_physical_plan::sorts::sort::SortExec;
5153
use datafusion_physical_plan::union::UnionExec;
5254
use datafusion_physical_plan::{
@@ -761,4 +763,104 @@ mod test {
761763

762764
Ok(())
763765
}
766+
767+
#[tokio::test]
768+
async fn test_statistic_by_partition_of_repartition() -> Result<()> {
769+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
770+
771+
let repartition = Arc::new(RepartitionExec::try_new(
772+
scan.clone(),
773+
Partitioning::RoundRobinBatch(3),
774+
)?);
775+
776+
let statistics = (0..repartition.partitioning().partition_count())
777+
.map(|idx| repartition.partition_statistics(Some(idx)))
778+
.collect::<Result<Vec<_>>>()?;
779+
assert_eq!(statistics.len(), 3);
780+
781+
let expected_stats = Statistics {
782+
num_rows: Precision::Inexact(1),
783+
total_byte_size: Precision::Inexact(73),
784+
column_statistics: vec![
785+
ColumnStatistics::new_unknown(),
786+
ColumnStatistics::new_unknown(),
787+
],
788+
};
789+
790+
// All partitions should have the same statistics
791+
for stat in statistics.iter() {
792+
assert_eq!(stat, &expected_stats);
793+
}
794+
795+
// Verify that the result has exactly 3 partitions
796+
let partitions = execute_stream_partitioned(
797+
repartition.clone(),
798+
Arc::new(TaskContext::default()),
799+
)?;
800+
assert_eq!(partitions.len(), 3);
801+
802+
// Collect row counts from each partition
803+
let mut partition_row_counts = Vec::new();
804+
for partition_stream in partitions.into_iter() {
805+
let results: Vec<RecordBatch> = partition_stream.try_collect().await?;
806+
let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum();
807+
partition_row_counts.push(total_rows);
808+
}
809+
assert_eq!(partition_row_counts.len(), 3);
810+
assert_eq!(partition_row_counts[0], 2);
811+
assert_eq!(partition_row_counts[1], 2);
812+
assert_eq!(partition_row_counts[2], 0);
813+
814+
Ok(())
815+
}
816+
817+
#[tokio::test]
818+
async fn test_statistic_by_partition_of_repartition_invalid_partition() -> Result<()>
819+
{
820+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
821+
822+
let repartition = Arc::new(RepartitionExec::try_new(
823+
scan.clone(),
824+
Partitioning::RoundRobinBatch(2),
825+
)?);
826+
827+
let result = repartition.partition_statistics(Some(2));
828+
assert!(result.is_err());
829+
let error = result.unwrap_err();
830+
assert!(error
831+
.to_string()
832+
.contains("RepartitionExec invalid partition 2 (expected less than 2)"));
833+
834+
let partitions = execute_stream_partitioned(
835+
repartition.clone(),
836+
Arc::new(TaskContext::default()),
837+
)?;
838+
assert_eq!(partitions.len(), 2);
839+
840+
Ok(())
841+
}
842+
843+
#[tokio::test]
844+
async fn test_statistic_by_partition_of_repartition_zero_partitions() -> Result<()> {
845+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
846+
let scan_schema = scan.schema();
847+
848+
// Create a repartition with 0 partitions
849+
let repartition = Arc::new(RepartitionExec::try_new(
850+
Arc::new(EmptyExec::new(scan_schema.clone())),
851+
Partitioning::RoundRobinBatch(0),
852+
)?);
853+
854+
let result = repartition.partition_statistics(Some(0))?;
855+
assert_eq!(result, Statistics::new_unknown(&scan_schema));
856+
857+
// Verify that the result has exactly 0 partitions
858+
let partitions = execute_stream_partitioned(
859+
repartition.clone(),
860+
Arc::new(TaskContext::default()),
861+
)?;
862+
assert_eq!(partitions.len(), 0);
863+
864+
Ok(())
865+
}
764866
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
4545
use arrow::compute::take_arrays;
4646
use arrow::datatypes::{SchemaRef, UInt32Type};
4747
use datafusion_common::config::ConfigOptions;
48+
use datafusion_common::stats::Precision;
4849
use datafusion_common::utils::transpose;
49-
use datafusion_common::{internal_err, HashMap};
50+
use datafusion_common::{internal_err, ColumnStatistics, HashMap};
5051
use datafusion_common::{not_impl_err, DataFusionError, Result};
5152
use datafusion_common_runtime::SpawnedTask;
5253
use datafusion_execution::memory_pool::MemoryConsumer;
@@ -755,10 +756,44 @@ impl ExecutionPlan for RepartitionExec {
755756
}
756757

757758
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
758-
if partition.is_none() {
759-
self.input.partition_statistics(None)
759+
if let Some(partition) = partition {
760+
let partition_count = self.partitioning().partition_count();
761+
if partition_count == 0 {
762+
return Ok(Statistics::new_unknown(&self.schema()));
763+
}
764+
765+
if partition >= partition_count {
766+
return internal_err!(
767+
"RepartitionExec invalid partition {} (expected less than {})",
768+
partition,
769+
self.partitioning().partition_count()
770+
);
771+
}
772+
773+
let mut stats = self.input.partition_statistics(None)?;
774+
775+
// Distribute statistics across partitions
776+
stats.num_rows = stats
777+
.num_rows
778+
.get_value()
779+
.map(|rows| Precision::Inexact(rows / partition_count))
780+
.unwrap_or(Precision::Absent);
781+
stats.total_byte_size = stats
782+
.total_byte_size
783+
.get_value()
784+
.map(|bytes| Precision::Inexact(bytes / partition_count))
785+
.unwrap_or(Precision::Absent);
786+
787+
// Make all column stats unknown
788+
stats.column_statistics = stats
789+
.column_statistics
790+
.iter()
791+
.map(|_| ColumnStatistics::new_unknown())
792+
.collect();
793+
794+
Ok(stats)
760795
} else {
761-
Ok(Statistics::new_unknown(&self.schema()))
796+
self.input.partition_statistics(None)
762797
}
763798
}
764799

0 commit comments

Comments
 (0)