From 6891981780f7f3fc1881960ef76cd574d4fa90fb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:39:47 -0500 Subject: [PATCH 1/4] wip --- datafusion/catalog-listing/src/helpers.rs | 1 + .../core/src/datasource/file_format/mod.rs | 1 + datafusion/core/src/datasource/mod.rs | 1 + .../src/datasource/physical_plan/parquet.rs | 5 ++ datafusion/core/src/test_util/parquet.rs | 1 + .../core/tests/parquet/custom_reader.rs | 1 + datafusion/core/tests/parquet/page_pruning.rs | 1 + datafusion/datasource/src/display.rs | 1 + datafusion/datasource/src/file_scan_config.rs | 1 + datafusion/datasource/src/mod.rs | 52 ++++++++++++++++++- datafusion/expr-common/src/statistics.rs | 2 + .../proto/src/physical_plan/from_proto.rs | 1 + .../substrait/src/physical_plan/consumer.rs | 1 + 13 files changed, 68 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 00e9c71df348..6262979cc489 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -478,6 +478,7 @@ pub async fn pruned_partition_list<'a>( statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, }) })); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e165707c2eb0..a2419bfd9b7d 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -75,6 +75,7 @@ pub(crate) mod test_util { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }] diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 94d651ddadd5..72f9230a4666 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -115,6 +115,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d0774e57174e..43abdc775b3f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1532,6 +1532,7 @@ mod tests { partition_values: vec![], range: Some(FileRange { start, end }), statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, } @@ -1629,6 +1630,7 @@ mod tests { ], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; @@ -1719,6 +1721,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; @@ -2266,6 +2269,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, } @@ -2282,6 +2286,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }) diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index eb4c61c02524..9b15c8481924 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -167,6 +167,7 @@ impl TestParquetFile { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 3a1f06656236..fdee017a843b 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -74,6 +74,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), metadata_size_hint: None, }) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 27bee10234b5..9e652bfa9d56 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -72,6 +72,7 @@ async fn get_parquet_exec( partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; diff --git a/datafusion/datasource/src/display.rs b/datafusion/datasource/src/display.rs index c9e979535963..c6b409a0076c 100644 --- a/datafusion/datasource/src/display.rs +++ b/datafusion/datasource/src/display.rs @@ -294,6 +294,7 @@ mod tests { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e67e1f827372..5235fc296b93 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -2091,6 +2091,7 @@ mod tests { })), extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 1f47c0983ea1..c638e3dd3717 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -54,6 +54,7 @@ use chrono::TimeZone; use datafusion_common::stats::Precision; use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result}; use datafusion_common::{ScalarValue, Statistics}; +use datafusion_expr::statistics::Distribution; use futures::{Stream, StreamExt}; use object_store::{path::Path, ObjectMeta}; use object_store::{GetOptions, GetRange, ObjectStore}; @@ -87,6 +88,37 @@ impl FileRange { } } +/// Statistics for a column within a relation +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ColumnDistributionStatistics { + /// Number of null values on column + pub null_count: Precision, + /// Distribution of values within the column + pub distribution: Distribution, + /// Sum value of a column + pub sum_value: Precision, + /// Number of distinct values + pub distinct_count: Precision, + /// Size of each value in the column, in bytes. + pub row_size: Distribution, +} + +/// Statistics for a file or a group of files. +/// Fields are optional and can be inexact because the sources +/// sometimes provide approximate estimates for performance reasons +/// and the transformations output are not always predictable. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileStatistics { + /// The number of table rows. + pub num_rows: Precision, + /// Total bytes of the table rows. + pub total_byte_size: Precision, + /// Statistics on a column level. + /// Each entry in the vector corresponds to a column in the source schema. + /// None entries are possible if statistics are not available for a column. + pub column_statistics: Vec>, +} + #[derive(Debug, Clone)] /// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. @@ -111,6 +143,11 @@ pub struct PartitionedFile { /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. pub statistics: Option>, + /// Optinal distribution based statistics that describe the data in this file if known. + /// + /// These statistics are used for optimization purposes, such as join planning. + /// If marked as exact these may also be used for correctness purposes, such as in pruning files based on filter predicates. + pub distribution_statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, /// The estimated size of the parquet metadata, in bytes @@ -133,6 +170,7 @@ impl PartitionedFile { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } @@ -151,6 +189,7 @@ impl PartitionedFile { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } .with_range(start, end) } @@ -191,12 +230,21 @@ impl PartitionedFile { self } - // Update the statistics for this file. + /// Update the statistics for this file. pub fn with_statistics(mut self, statistics: Arc) -> Self { self.statistics = Some(statistics); self } + /// Update distribution based statistics for this file. + pub fn with_distribution_statistics( + mut self, + statistics: Arc, + ) -> Self { + self.distribution_statistics = Some(statistics); + self + } + /// Check if this file has any statistics. /// This returns `true` if the file has any Exact or Inexact statistics /// and `false` if all statistics are `Precision::Absent`. @@ -224,6 +272,7 @@ impl From for PartitionedFile { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } } @@ -414,6 +463,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec for PartitionedFile { .as_ref() .map(|v| v.try_into().map(Arc::new)) .transpose()?, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }) diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index ecf465dd3f18..aab60971b88e 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -130,6 +130,7 @@ pub async fn from_substrait_rel( partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; From 0c8b56f1fb441c20c24c3856470f65a5434c437a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:49:49 -0500 Subject: [PATCH 2/4] add Distribution based statistics to PartitionedFile --- datafusion/datasource/src/mod.rs | 599 +++++++++++++++++- .../expr-common/src/interval_arithmetic.rs | 2 +- datafusion/expr-common/src/statistics.rs | 14 +- 3 files changed, 604 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index c638e3dd3717..18211a5714c2 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -94,13 +94,84 @@ pub struct ColumnDistributionStatistics { /// Number of null values on column pub null_count: Precision, /// Distribution of values within the column - pub distribution: Distribution, + pub distribution: Precision, /// Sum value of a column pub sum_value: Precision, /// Number of distinct values pub distinct_count: Precision, /// Size of each value in the column, in bytes. - pub row_size: Distribution, + pub row_size: Precision, +} + +impl ColumnDistributionStatistics { + /// Returns a [`ColumnDistributionStatistics`] instance having all [`Precision::Absent`] parameters. + pub fn new_unknown() -> Self { + Self { + null_count: Precision::Absent, + distribution: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + row_size: Precision::Absent, + } + } + + /// Set the null count + pub fn with_null_count(mut self, null_count: Precision) -> Self { + self.null_count = null_count; + self + } + + /// Set the distribution + pub fn with_distribution(mut self, distribution: Precision) -> Self { + self.distribution = distribution; + self + } + + /// Set the sum value + pub fn with_sum_value(mut self, sum_value: Precision) -> Self { + self.sum_value = sum_value; + self + } + + /// Set the distinct count + pub fn with_distinct_count(mut self, distinct_count: Precision) -> Self { + self.distinct_count = distinct_count; + self + } + + /// Set the row size + pub fn with_row_size(mut self, row_size: Precision) -> Self { + self.row_size = row_size; + self + } + + /// If the exactness of a [`ColumnDistributionStatistics`] instance is lost, this + /// function relaxes the exactness of all information by converting them to + /// [`Precision::Inexact`]. + pub fn to_inexact(mut self) -> Self { + self.null_count = self.null_count.to_inexact(); + self.distribution = self.distribution.to_inexact(); + self.sum_value = self.sum_value.to_inexact(); + self.distinct_count = self.distinct_count.to_inexact(); + self.row_size = self.row_size.to_inexact(); + self + } + + /// Check if the distribution represents a single value (singleton). + /// This is true when the distribution's range is a point (lower == upper). + pub fn is_singleton(&self) -> bool { + match &self.distribution { + Precision::Exact(dist) | Precision::Inexact(dist) => { + if let Ok(range) = dist.range() { + let (lower, upper) = (range.lower(), range.upper()); + !lower.is_null() && !upper.is_null() && lower == upper + } else { + false + } + } + Precision::Absent => false, + } + } } /// Statistics for a file or a group of files. @@ -119,6 +190,282 @@ pub struct FileStatistics { pub column_statistics: Vec>, } +impl Default for FileStatistics { + /// Returns a new [`FileStatistics`] instance with all fields set to unknown + /// and no columns. + fn default() -> Self { + Self { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![], + } + } +} + +impl FileStatistics { + /// Returns a [`FileStatistics`] instance for the given schema by assigning + /// unknown statistics to each column in the schema. + pub fn new_unknown(schema: &arrow::datatypes::Schema) -> Self { + Self { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: schema + .fields() + .iter() + .map(|_| Some(ColumnDistributionStatistics::new_unknown())) + .collect(), + } + } + + /// Set the number of rows + pub fn with_num_rows(mut self, num_rows: Precision) -> Self { + self.num_rows = num_rows; + self + } + + /// Set the total size, in bytes + pub fn with_total_byte_size(mut self, total_byte_size: Precision) -> Self { + self.total_byte_size = total_byte_size; + self + } + + /// Add a column to the column statistics + pub fn add_column_statistics( + mut self, + column_stats: Option, + ) -> Self { + self.column_statistics.push(column_stats); + self + } + + /// If the exactness of a [`FileStatistics`] instance is lost, this function relaxes + /// the exactness of all information by converting them to [`Precision::Inexact`]. + pub fn to_inexact(mut self) -> Self { + self.num_rows = self.num_rows.to_inexact(); + self.total_byte_size = self.total_byte_size.to_inexact(); + self.column_statistics = self + .column_statistics + .into_iter() + .map(|s| s.map(|stats| stats.to_inexact())) + .collect(); + self + } + + /// Project the statistics to the given column indices. + /// + /// For example, if we had statistics for columns `{"a", "b", "c"}`, + /// projecting to `vec![2, 1]` would return statistics for columns `{"c", + /// "b"}`. + pub fn project(mut self, projection: Option<&Vec>) -> Self { + let Some(projection) = projection else { + return self; + }; + + #[allow(clippy::large_enum_variant)] + enum Slot { + /// The column is taken and put into the specified statistics location + Taken(usize), + /// The original column is present + Present(Option), + } + + // Convert to Vec so we can avoid copying the statistics + let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics) + .into_iter() + .map(Slot::Present) + .collect(); + + for idx in projection { + let next_idx = self.column_statistics.len(); + let slot = std::mem::replace( + columns.get_mut(*idx).expect("projection out of bounds"), + Slot::Taken(next_idx), + ); + match slot { + // The column was there, so just move it + Slot::Present(col) => self.column_statistics.push(col), + // The column was taken, so copy from the previous location + Slot::Taken(prev_idx) => self + .column_statistics + .push(self.column_statistics[prev_idx].clone()), + } + } + + self + } + + /// Summarize zero or more statistics into a single `FileStatistics` instance. + /// + /// The method assumes that all statistics are for the same schema. + /// + /// Returns an error if the statistics do not match the specified schema. + pub fn try_merge_iter<'a, I>( + items: I, + schema: &arrow::datatypes::Schema, + ) -> Result + where + I: IntoIterator, + { + let mut items = items.into_iter(); + + let Some(init) = items.next() else { + return Ok(FileStatistics::new_unknown(schema)); + }; + items.try_fold( + init.clone(), + |acc: FileStatistics, item_stats: &FileStatistics| acc.try_merge(item_stats), + ) + } + + /// Merge this FileStatistics value with another FileStatistics value. + /// + /// Returns an error if the statistics do not match (different schemas). + pub fn try_merge(self, other: &FileStatistics) -> Result { + let Self { + mut num_rows, + mut total_byte_size, + mut column_statistics, + } = self; + + // Accumulate statistics + num_rows = num_rows.add(&other.num_rows); + total_byte_size = total_byte_size.add(&other.total_byte_size); + + if column_statistics.len() != other.column_statistics.len() { + return Err(exec_datafusion_err!( + "Cannot merge statistics with different number of columns: {} vs {}", + column_statistics.len(), + other.column_statistics.len() + )); + } + + for (item_col_stats, col_stats) in other + .column_statistics + .iter() + .zip(column_statistics.iter_mut()) + { + *col_stats = + Self::merge_column_distribution_stats(col_stats, item_col_stats)?; + } + + Ok(FileStatistics { + num_rows, + total_byte_size, + column_statistics, + }) + } + + /// Merge two optional column distribution statistics. + /// Returns None if either input is None. + fn merge_column_distribution_stats( + left: &Option, + right: &Option, + ) -> Result> { + match (left, right) { + (Some(l), Some(r)) => { + let null_count = l.null_count.add(&r.null_count); + let distribution = + Self::merge_distributions(&l.distribution, &r.distribution)?; + let sum_value = l.sum_value.add(&r.sum_value); + // distinct_count cannot be accurately merged without additional info + let distinct_count = Precision::Absent; + let row_size = Self::merge_distributions(&l.row_size, &r.row_size)?; + + Ok(Some(ColumnDistributionStatistics { + null_count, + distribution, + sum_value, + distinct_count, + row_size, + })) + } + _ => Ok(None), + } + } + + /// Merge two distributions by taking the union of their ranges. + fn merge_distributions( + left: &Precision, + right: &Precision, + ) -> Result> { + match (left, right) { + (Precision::Exact(l), Precision::Exact(r)) => { + let l_range = l.range()?; + let r_range = r.range()?; + let merged_range = l_range.union(&r_range)?; + Ok(Precision::Exact(Distribution::new_from_interval( + merged_range, + )?)) + } + (Precision::Inexact(l), Precision::Exact(r)) + | (Precision::Exact(l), Precision::Inexact(r)) + | (Precision::Inexact(l), Precision::Inexact(r)) => { + let l_range = l.range()?; + let r_range = r.range()?; + let merged_range = l_range.union(&r_range)?; + Ok(Precision::Inexact(Distribution::new_from_interval( + merged_range, + )?)) + } + _ => Ok(Precision::Absent), + } + } +} + +impl std::fmt::Display for FileStatistics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // String of column statistics + let column_stats = self + .column_statistics + .iter() + .enumerate() + .map(|(i, cs)| { + let s = format!("(Col[{i}]:"); + match cs { + Some(stats) => { + let s = if stats.distribution != Precision::Absent { + format!("{} Dist={}", s, stats.distribution) + } else { + s + }; + let s = if stats.sum_value != Precision::Absent { + format!("{} Sum={}", s, stats.sum_value) + } else { + s + }; + let s = if stats.null_count != Precision::Absent { + format!("{} Null={}", s, stats.null_count) + } else { + s + }; + let s = if stats.distinct_count != Precision::Absent { + format!("{} Distinct={}", s, stats.distinct_count) + } else { + s + }; + let s = if stats.row_size != Precision::Absent { + format!("{} RowSize={}", s, stats.row_size) + } else { + s + }; + s + ")" + } + None => format!("{s} None)"), + } + }) + .collect::>() + .join(","); + + write!( + f, + "Rows={}, Bytes={}, [{}]", + self.num_rows, self.total_byte_size, column_stats + )?; + + Ok(()) + } +} + #[derive(Debug, Clone)] /// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. @@ -639,4 +986,252 @@ mod tests { // testing an empty path with `ignore_subdirectory` set to false assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); } + + #[test] + fn test_column_distribution_statistics_builders() { + use super::ColumnDistributionStatistics; + use datafusion_common::stats::Precision; + use datafusion_common::ScalarValue; + + let stats = ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(5)) + .with_sum_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Inexact(10)); + + assert_eq!(stats.null_count, Precision::Exact(5)); + assert_eq!( + stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(100))) + ); + assert_eq!(stats.distinct_count, Precision::Inexact(10)); + assert_eq!(stats.distribution, Precision::Absent); + } + + #[test] + fn test_column_distribution_statistics_to_inexact() { + use super::ColumnDistributionStatistics; + use datafusion_common::stats::Precision; + use datafusion_common::ScalarValue; + + let stats = ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(5)) + .with_sum_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(10)); + + let inexact = stats.to_inexact(); + + assert_eq!(inexact.null_count, Precision::Inexact(5)); + assert_eq!( + inexact.sum_value, + Precision::Inexact(ScalarValue::Int32(Some(100))) + ); + assert_eq!(inexact.distinct_count, Precision::Inexact(10)); + } + + #[test] + fn test_column_distribution_statistics_is_singleton() { + use super::ColumnDistributionStatistics; + use datafusion_common::stats::Precision; + use datafusion_common::ScalarValue; + use datafusion_expr::statistics::Distribution; + + // Create a uniform distribution with a point range (singleton) + let point_dist = Distribution::new_uniform( + datafusion_expr::interval_arithmetic::Interval::try_new( + ScalarValue::Int32(Some(5)), + ScalarValue::Int32(Some(5)), + ) + .unwrap(), + ) + .unwrap(); + + let singleton_stats = ColumnDistributionStatistics::new_unknown() + .with_distribution(Precision::Exact(point_dist)); + + assert!(singleton_stats.is_singleton()); + + // Create a uniform distribution with a range (not singleton) + let range_dist = Distribution::new_uniform( + datafusion_expr::interval_arithmetic::Interval::try_new( + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(10)), + ) + .unwrap(), + ) + .unwrap(); + + let range_stats = ColumnDistributionStatistics::new_unknown() + .with_distribution(Precision::Exact(range_dist)); + + assert!(!range_stats.is_singleton()); + + // Test with absent distribution + let absent_stats = ColumnDistributionStatistics::new_unknown(); + assert!(!absent_stats.is_singleton()); + } + + #[test] + fn test_file_statistics_builders() { + use super::FileStatistics; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::stats::Precision; + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + + let stats = FileStatistics::new_unknown(&schema) + .with_num_rows(Precision::Exact(100)) + .with_total_byte_size(Precision::Inexact(1024)); + + assert_eq!(stats.num_rows, Precision::Exact(100)); + assert_eq!(stats.total_byte_size, Precision::Inexact(1024)); + assert_eq!(stats.column_statistics.len(), 2); + } + + #[test] + fn test_file_statistics_to_inexact() { + use super::{ColumnDistributionStatistics, FileStatistics}; + use datafusion_common::stats::Precision; + + let stats = FileStatistics::default() + .with_num_rows(Precision::Exact(100)) + .with_total_byte_size(Precision::Exact(1024)) + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(5)), + )); + + let inexact = stats.to_inexact(); + + assert_eq!(inexact.num_rows, Precision::Inexact(100)); + assert_eq!(inexact.total_byte_size, Precision::Inexact(1024)); + assert_eq!( + inexact.column_statistics[0].as_ref().unwrap().null_count, + Precision::Inexact(5) + ); + } + + #[test] + fn test_file_statistics_project() { + use super::{ColumnDistributionStatistics, FileStatistics}; + use datafusion_common::stats::Precision; + + let stats = FileStatistics::default() + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(1)), + )) + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(2)), + )) + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(3)), + )); + + // Project to columns [2, 0] + let projection = vec![2, 0]; + let projected = stats.project(Some(&projection)); + + assert_eq!(projected.column_statistics.len(), 2); + assert_eq!( + projected.column_statistics[0].as_ref().unwrap().null_count, + Precision::Exact(3) + ); + assert_eq!( + projected.column_statistics[1].as_ref().unwrap().null_count, + Precision::Exact(1) + ); + } + + #[test] + fn test_file_statistics_merge() { + use super::{ColumnDistributionStatistics, FileStatistics}; + use datafusion_common::stats::Precision; + use datafusion_common::ScalarValue; + use datafusion_expr::statistics::Distribution; + + // Create two file statistics to merge + let dist1 = Distribution::new_uniform( + datafusion_expr::interval_arithmetic::Interval::try_new( + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(10)), + ) + .unwrap(), + ) + .unwrap(); + + let dist2 = Distribution::new_uniform( + datafusion_expr::interval_arithmetic::Interval::try_new( + ScalarValue::Int32(Some(5)), + ScalarValue::Int32(Some(15)), + ) + .unwrap(), + ) + .unwrap(); + + let stats1 = FileStatistics::default() + .with_num_rows(Precision::Exact(100)) + .with_total_byte_size(Precision::Exact(1000)) + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(5)) + .with_distribution(Precision::Exact(dist1)) + .with_sum_value(Precision::Exact(ScalarValue::Int32(Some(500)))), + )); + + let stats2 = FileStatistics::default() + .with_num_rows(Precision::Exact(200)) + .with_total_byte_size(Precision::Inexact(2000)) + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(10)) + .with_distribution(Precision::Exact(dist2)) + .with_sum_value(Precision::Exact(ScalarValue::Int32(Some(1000)))), + )); + + let merged = stats1.try_merge(&stats2).unwrap(); + + // Check merged values + assert_eq!(merged.num_rows, Precision::Exact(300)); + assert_eq!(merged.total_byte_size, Precision::Inexact(3000)); + + let col_stats = merged.column_statistics[0].as_ref().unwrap(); + assert_eq!(col_stats.null_count, Precision::Exact(15)); + assert_eq!( + col_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1500))) + ); + + // Check that the distribution range was merged (union of [1,10] and [5,15] is [1,15]) + if let Precision::Exact(dist) = &col_stats.distribution { + let range = dist.range().unwrap(); + assert_eq!(range.lower(), &ScalarValue::Int32(Some(1))); + assert_eq!(range.upper(), &ScalarValue::Int32(Some(15))); + } else { + panic!("Expected exact distribution"); + } + } + + #[test] + fn test_file_statistics_display() { + use super::{ColumnDistributionStatistics, FileStatistics}; + use datafusion_common::stats::Precision; + + let stats = FileStatistics::default() + .with_num_rows(Precision::Exact(100)) + .with_total_byte_size(Precision::Inexact(1024)) + .add_column_statistics(Some( + ColumnDistributionStatistics::new_unknown() + .with_null_count(Precision::Exact(5)), + )); + + let display_str = format!("{}", stats); + assert!(display_str.contains("Rows=Exact(100)")); + assert!(display_str.contains("Bytes=Inexact(1024)")); + assert!(display_str.contains("Null=Exact(5)")); + } } diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index b5b632076b00..f2923d1b1713 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -175,7 +175,7 @@ macro_rules! value_transition { /// ensure consistency, with other data types. /// - `NaN` (Not a Number) results are conservatively result in unbounded /// endpoints. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)] pub struct Interval { lower: ScalarValue, upper: ScalarValue, diff --git a/datafusion/expr-common/src/statistics.rs b/datafusion/expr-common/src/statistics.rs index 981b26bf8477..81fc3d7d5158 100644 --- a/datafusion/expr-common/src/statistics.rs +++ b/datafusion/expr-common/src/statistics.rs @@ -34,7 +34,7 @@ use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; /// is the main unit of calculus when evaluating expressions in a statistical /// context. Notions like column and table statistics are built on top of this /// object and the operations it supports. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub enum Distribution { Uniform(UniformDistribution), Exponential(ExponentialDistribution), @@ -205,15 +205,13 @@ impl Distribution { } } -impl Eq for Distribution {} - /// Uniform distribution, represented by its range. If the given range extends /// towards infinity, the distribution will be improper -- which is OK. For a /// more in-depth discussion, see: /// /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct UniformDistribution { interval: Interval, } @@ -235,7 +233,7 @@ pub struct UniformDistribution { /// For more information, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct ExponentialDistribution { rate: ScalarValue, offset: ScalarValue, @@ -248,7 +246,7 @@ pub struct ExponentialDistribution { /// For a more in-depth discussion, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct GaussianDistribution { mean: ScalarValue, variance: ScalarValue, @@ -258,7 +256,7 @@ pub struct GaussianDistribution { /// the success probability is unknown. For a more in-depth discussion, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct BernoulliDistribution { p: ScalarValue, } @@ -267,7 +265,7 @@ pub struct BernoulliDistribution { /// approximated via some summary statistics. For a more in-depth discussion, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct GenericDistribution { mean: ScalarValue, median: ScalarValue, From ab3b6d6488f2da50fab6a7d3fb1bcb7d74d78432 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:56:08 -0500 Subject: [PATCH 3/4] fix typo --- datafusion/datasource/src/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 18211a5714c2..9f4255dd52be 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -490,7 +490,7 @@ pub struct PartitionedFile { /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. pub statistics: Option>, - /// Optinal distribution based statistics that describe the data in this file if known. + /// Optional distribution based statistics that describe the data in this file if known. /// /// These statistics are used for optimization purposes, such as join planning. /// If marked as exact these may also be used for correctness purposes, such as in pruning files based on filter predicates. From 3cc8f48bb14543e7c484a8b2777e0a5219d987b0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 14:24:56 -0500 Subject: [PATCH 4/4] Arc --- datafusion/datasource/src/mod.rs | 48 ++++++++++++++++---------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 9f4255dd52be..8783128bbcd1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -187,7 +187,7 @@ pub struct FileStatistics { /// Statistics on a column level. /// Each entry in the vector corresponds to a column in the source schema. /// None entries are possible if statistics are not available for a column. - pub column_statistics: Vec>, + pub column_statistics: Vec>>, } impl Default for FileStatistics { @@ -212,7 +212,7 @@ impl FileStatistics { column_statistics: schema .fields() .iter() - .map(|_| Some(ColumnDistributionStatistics::new_unknown())) + .map(|_| Some(Arc::new(ColumnDistributionStatistics::new_unknown()))) .collect(), } } @@ -232,7 +232,7 @@ impl FileStatistics { /// Add a column to the column statistics pub fn add_column_statistics( mut self, - column_stats: Option, + column_stats: Option>, ) -> Self { self.column_statistics.push(column_stats); self @@ -246,7 +246,7 @@ impl FileStatistics { self.column_statistics = self .column_statistics .into_iter() - .map(|s| s.map(|stats| stats.to_inexact())) + .map(|s| s.map(|arc_stats| Arc::new(arc_stats.as_ref().clone().to_inexact()))) .collect(); self } @@ -266,7 +266,7 @@ impl FileStatistics { /// The column is taken and put into the specified statistics location Taken(usize), /// The original column is present - Present(Option), + Present(Option>), } // Convert to Vec so we can avoid copying the statistics @@ -358,9 +358,9 @@ impl FileStatistics { /// Merge two optional column distribution statistics. /// Returns None if either input is None. fn merge_column_distribution_stats( - left: &Option, - right: &Option, - ) -> Result> { + left: &Option>, + right: &Option>, + ) -> Result>> { match (left, right) { (Some(l), Some(r)) => { let null_count = l.null_count.add(&r.null_count); @@ -371,13 +371,13 @@ impl FileStatistics { let distinct_count = Precision::Absent; let row_size = Self::merge_distributions(&l.row_size, &r.row_size)?; - Ok(Some(ColumnDistributionStatistics { + Ok(Some(Arc::new(ColumnDistributionStatistics { null_count, distribution, sum_value, distinct_count, row_size, - })) + }))) } _ => Ok(None), } @@ -1098,10 +1098,10 @@ mod tests { let stats = FileStatistics::default() .with_num_rows(Precision::Exact(100)) .with_total_byte_size(Precision::Exact(1024)) - .add_column_statistics(Some( + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(5)), - )); + ))); let inexact = stats.to_inexact(); @@ -1119,18 +1119,18 @@ mod tests { use datafusion_common::stats::Precision; let stats = FileStatistics::default() - .add_column_statistics(Some( + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(1)), - )) - .add_column_statistics(Some( + ))) + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(2)), - )) - .add_column_statistics(Some( + ))) + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(3)), - )); + ))); // Project to columns [2, 0] let projection = vec![2, 0]; @@ -1176,22 +1176,22 @@ mod tests { let stats1 = FileStatistics::default() .with_num_rows(Precision::Exact(100)) .with_total_byte_size(Precision::Exact(1000)) - .add_column_statistics(Some( + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(5)) .with_distribution(Precision::Exact(dist1)) .with_sum_value(Precision::Exact(ScalarValue::Int32(Some(500)))), - )); + ))); let stats2 = FileStatistics::default() .with_num_rows(Precision::Exact(200)) .with_total_byte_size(Precision::Inexact(2000)) - .add_column_statistics(Some( + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(10)) .with_distribution(Precision::Exact(dist2)) .with_sum_value(Precision::Exact(ScalarValue::Int32(Some(1000)))), - )); + ))); let merged = stats1.try_merge(&stats2).unwrap(); @@ -1224,10 +1224,10 @@ mod tests { let stats = FileStatistics::default() .with_num_rows(Precision::Exact(100)) .with_total_byte_size(Precision::Inexact(1024)) - .add_column_statistics(Some( + .add_column_statistics(Some(Arc::new( ColumnDistributionStatistics::new_unknown() .with_null_count(Precision::Exact(5)), - )); + ))); let display_str = format!("{}", stats); assert!(display_str.contains("Rows=Exact(100)"));