diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 00e9c71df348..6262979cc489 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -478,6 +478,7 @@ pub async fn pruned_partition_list<'a>( statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, }) })); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e165707c2eb0..a2419bfd9b7d 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -75,6 +75,7 @@ pub(crate) mod test_util { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }] diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 94d651ddadd5..72f9230a4666 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -115,6 +115,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d0774e57174e..43abdc775b3f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1532,6 +1532,7 @@ mod tests { partition_values: vec![], range: Some(FileRange { start, end }), statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, } @@ -1629,6 +1630,7 @@ mod tests { ], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; @@ -1719,6 +1721,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; @@ -2266,6 +2269,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, } @@ -2282,6 +2286,7 @@ mod tests { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }) diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index eb4c61c02524..9b15c8481924 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -167,6 +167,7 @@ impl TestParquetFile { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 3a1f06656236..fdee017a843b 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -74,6 +74,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), metadata_size_hint: None, }) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 27bee10234b5..9e652bfa9d56 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -72,6 +72,7 @@ async fn get_parquet_exec( partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }; diff --git a/datafusion/datasource/src/display.rs b/datafusion/datasource/src/display.rs index c9e979535963..c6b409a0076c 100644 --- a/datafusion/datasource/src/display.rs +++ b/datafusion/datasource/src/display.rs @@ -294,6 +294,7 @@ mod tests { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e67e1f827372..5235fc296b93 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -2091,6 +2091,7 @@ mod tests { })), extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 1f47c0983ea1..8783128bbcd1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -54,6 +54,7 @@ use chrono::TimeZone; use datafusion_common::stats::Precision; use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result}; use datafusion_common::{ScalarValue, Statistics}; +use datafusion_expr::statistics::Distribution; use futures::{Stream, StreamExt}; use object_store::{path::Path, ObjectMeta}; use object_store::{GetOptions, GetRange, ObjectStore}; @@ -87,6 +88,384 @@ impl FileRange { } } +/// Statistics for a column within a relation +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ColumnDistributionStatistics { + /// Number of null values on column + pub null_count: Precision, + /// Distribution of values within the column + pub distribution: Precision, + /// Sum value of a column + pub sum_value: Precision, + /// Number of distinct values + pub distinct_count: Precision, + /// Size of each value in the column, in bytes. + pub row_size: Precision, +} + +impl ColumnDistributionStatistics { + /// Returns a [`ColumnDistributionStatistics`] instance having all [`Precision::Absent`] parameters. + pub fn new_unknown() -> Self { + Self { + null_count: Precision::Absent, + distribution: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + row_size: Precision::Absent, + } + } + + /// Set the null count + pub fn with_null_count(mut self, null_count: Precision) -> Self { + self.null_count = null_count; + self + } + + /// Set the distribution + pub fn with_distribution(mut self, distribution: Precision) -> Self { + self.distribution = distribution; + self + } + + /// Set the sum value + pub fn with_sum_value(mut self, sum_value: Precision) -> Self { + self.sum_value = sum_value; + self + } + + /// Set the distinct count + pub fn with_distinct_count(mut self, distinct_count: Precision) -> Self { + self.distinct_count = distinct_count; + self + } + + /// Set the row size + pub fn with_row_size(mut self, row_size: Precision) -> Self { + self.row_size = row_size; + self + } + + /// If the exactness of a [`ColumnDistributionStatistics`] instance is lost, this + /// function relaxes the exactness of all information by converting them to + /// [`Precision::Inexact`]. + pub fn to_inexact(mut self) -> Self { + self.null_count = self.null_count.to_inexact(); + self.distribution = self.distribution.to_inexact(); + self.sum_value = self.sum_value.to_inexact(); + self.distinct_count = self.distinct_count.to_inexact(); + self.row_size = self.row_size.to_inexact(); + self + } + + /// Check if the distribution represents a single value (singleton). + /// This is true when the distribution's range is a point (lower == upper). + pub fn is_singleton(&self) -> bool { + match &self.distribution { + Precision::Exact(dist) | Precision::Inexact(dist) => { + if let Ok(range) = dist.range() { + let (lower, upper) = (range.lower(), range.upper()); + !lower.is_null() && !upper.is_null() && lower == upper + } else { + false + } + } + Precision::Absent => false, + } + } +} + +/// Statistics for a file or a group of files. +/// Fields are optional and can be inexact because the sources +/// sometimes provide approximate estimates for performance reasons +/// and the transformations output are not always predictable. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileStatistics { + /// The number of table rows. + pub num_rows: Precision, + /// Total bytes of the table rows. + pub total_byte_size: Precision, + /// Statistics on a column level. + /// Each entry in the vector corresponds to a column in the source schema. + /// None entries are possible if statistics are not available for a column. + pub column_statistics: Vec>>, +} + +impl Default for FileStatistics { + /// Returns a new [`FileStatistics`] instance with all fields set to unknown + /// and no columns. + fn default() -> Self { + Self { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![], + } + } +} + +impl FileStatistics { + /// Returns a [`FileStatistics`] instance for the given schema by assigning + /// unknown statistics to each column in the schema. + pub fn new_unknown(schema: &arrow::datatypes::Schema) -> Self { + Self { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: schema + .fields() + .iter() + .map(|_| Some(Arc::new(ColumnDistributionStatistics::new_unknown()))) + .collect(), + } + } + + /// Set the number of rows + pub fn with_num_rows(mut self, num_rows: Precision) -> Self { + self.num_rows = num_rows; + self + } + + /// Set the total size, in bytes + pub fn with_total_byte_size(mut self, total_byte_size: Precision) -> Self { + self.total_byte_size = total_byte_size; + self + } + + /// Add a column to the column statistics + pub fn add_column_statistics( + mut self, + column_stats: Option>, + ) -> Self { + self.column_statistics.push(column_stats); + self + } + + /// If the exactness of a [`FileStatistics`] instance is lost, this function relaxes + /// the exactness of all information by converting them to [`Precision::Inexact`]. + pub fn to_inexact(mut self) -> Self { + self.num_rows = self.num_rows.to_inexact(); + self.total_byte_size = self.total_byte_size.to_inexact(); + self.column_statistics = self + .column_statistics + .into_iter() + .map(|s| s.map(|arc_stats| Arc::new(arc_stats.as_ref().clone().to_inexact()))) + .collect(); + self + } + + /// Project the statistics to the given column indices. + /// + /// For example, if we had statistics for columns `{"a", "b", "c"}`, + /// projecting to `vec![2, 1]` would return statistics for columns `{"c", + /// "b"}`. + pub fn project(mut self, projection: Option<&Vec>) -> Self { + let Some(projection) = projection else { + return self; + }; + + #[allow(clippy::large_enum_variant)] + enum Slot { + /// The column is taken and put into the specified statistics location + Taken(usize), + /// The original column is present + Present(Option>), + } + + // Convert to Vec so we can avoid copying the statistics + let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics) + .into_iter() + .map(Slot::Present) + .collect(); + + for idx in projection { + let next_idx = self.column_statistics.len(); + let slot = std::mem::replace( + columns.get_mut(*idx).expect("projection out of bounds"), + Slot::Taken(next_idx), + ); + match slot { + // The column was there, so just move it + Slot::Present(col) => self.column_statistics.push(col), + // The column was taken, so copy from the previous location + Slot::Taken(prev_idx) => self + .column_statistics + .push(self.column_statistics[prev_idx].clone()), + } + } + + self + } + + /// Summarize zero or more statistics into a single `FileStatistics` instance. + /// + /// The method assumes that all statistics are for the same schema. + /// + /// Returns an error if the statistics do not match the specified schema. + pub fn try_merge_iter<'a, I>( + items: I, + schema: &arrow::datatypes::Schema, + ) -> Result + where + I: IntoIterator, + { + let mut items = items.into_iter(); + + let Some(init) = items.next() else { + return Ok(FileStatistics::new_unknown(schema)); + }; + items.try_fold( + init.clone(), + |acc: FileStatistics, item_stats: &FileStatistics| acc.try_merge(item_stats), + ) + } + + /// Merge this FileStatistics value with another FileStatistics value. + /// + /// Returns an error if the statistics do not match (different schemas). + pub fn try_merge(self, other: &FileStatistics) -> Result { + let Self { + mut num_rows, + mut total_byte_size, + mut column_statistics, + } = self; + + // Accumulate statistics + num_rows = num_rows.add(&other.num_rows); + total_byte_size = total_byte_size.add(&other.total_byte_size); + + if column_statistics.len() != other.column_statistics.len() { + return Err(exec_datafusion_err!( + "Cannot merge statistics with different number of columns: {} vs {}", + column_statistics.len(), + other.column_statistics.len() + )); + } + + for (item_col_stats, col_stats) in other + .column_statistics + .iter() + .zip(column_statistics.iter_mut()) + { + *col_stats = + Self::merge_column_distribution_stats(col_stats, item_col_stats)?; + } + + Ok(FileStatistics { + num_rows, + total_byte_size, + column_statistics, + }) + } + + /// Merge two optional column distribution statistics. + /// Returns None if either input is None. + fn merge_column_distribution_stats( + left: &Option>, + right: &Option>, + ) -> Result>> { + match (left, right) { + (Some(l), Some(r)) => { + let null_count = l.null_count.add(&r.null_count); + let distribution = + Self::merge_distributions(&l.distribution, &r.distribution)?; + let sum_value = l.sum_value.add(&r.sum_value); + // distinct_count cannot be accurately merged without additional info + let distinct_count = Precision::Absent; + let row_size = Self::merge_distributions(&l.row_size, &r.row_size)?; + + Ok(Some(Arc::new(ColumnDistributionStatistics { + null_count, + distribution, + sum_value, + distinct_count, + row_size, + }))) + } + _ => Ok(None), + } + } + + /// Merge two distributions by taking the union of their ranges. + fn merge_distributions( + left: &Precision, + right: &Precision, + ) -> Result> { + match (left, right) { + (Precision::Exact(l), Precision::Exact(r)) => { + let l_range = l.range()?; + let r_range = r.range()?; + let merged_range = l_range.union(&r_range)?; + Ok(Precision::Exact(Distribution::new_from_interval( + merged_range, + )?)) + } + (Precision::Inexact(l), Precision::Exact(r)) + | (Precision::Exact(l), Precision::Inexact(r)) + | (Precision::Inexact(l), Precision::Inexact(r)) => { + let l_range = l.range()?; + let r_range = r.range()?; + let merged_range = l_range.union(&r_range)?; + Ok(Precision::Inexact(Distribution::new_from_interval( + merged_range, + )?)) + } + _ => Ok(Precision::Absent), + } + } +} + +impl std::fmt::Display for FileStatistics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // String of column statistics + let column_stats = self + .column_statistics + .iter() + .enumerate() + .map(|(i, cs)| { + let s = format!("(Col[{i}]:"); + match cs { + Some(stats) => { + let s = if stats.distribution != Precision::Absent { + format!("{} Dist={}", s, stats.distribution) + } else { + s + }; + let s = if stats.sum_value != Precision::Absent { + format!("{} Sum={}", s, stats.sum_value) + } else { + s + }; + let s = if stats.null_count != Precision::Absent { + format!("{} Null={}", s, stats.null_count) + } else { + s + }; + let s = if stats.distinct_count != Precision::Absent { + format!("{} Distinct={}", s, stats.distinct_count) + } else { + s + }; + let s = if stats.row_size != Precision::Absent { + format!("{} RowSize={}", s, stats.row_size) + } else { + s + }; + s + ")" + } + None => format!("{s} None)"), + } + }) + .collect::>() + .join(","); + + write!( + f, + "Rows={}, Bytes={}, [{}]", + self.num_rows, self.total_byte_size, column_stats + )?; + + Ok(()) + } +} + #[derive(Debug, Clone)] /// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. @@ -111,6 +490,11 @@ pub struct PartitionedFile { /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. pub statistics: Option>, + /// Optional distribution based statistics that describe the data in this file if known. + /// + /// These statistics are used for optimization purposes, such as join planning. + /// If marked as exact these may also be used for correctness purposes, such as in pruning files based on filter predicates. + pub distribution_statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, /// The estimated size of the parquet metadata, in bytes @@ -133,6 +517,7 @@ impl PartitionedFile { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } @@ -151,6 +536,7 @@ impl PartitionedFile { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } .with_range(start, end) } @@ -191,12 +577,21 @@ impl PartitionedFile { self } - // Update the statistics for this file. + /// Update the statistics for this file. pub fn with_statistics(mut self, statistics: Arc) -> Self { self.statistics = Some(statistics); self } + /// Update distribution based statistics for this file. + pub fn with_distribution_statistics( + mut self, + statistics: Arc, + ) -> Self { + self.distribution_statistics = Some(statistics); + self + } + /// Check if this file has any statistics. /// This returns `true` if the file has any Exact or Inexact statistics /// and `false` if all statistics are `Precision::Absent`. @@ -224,6 +619,7 @@ impl From for PartitionedFile { statistics: None, extensions: None, metadata_size_hint: None, + distribution_statistics: None, } } } @@ -414,6 +810,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct UniformDistribution { interval: Interval, } @@ -233,7 +233,7 @@ pub struct UniformDistribution { /// For more information, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct ExponentialDistribution { rate: ScalarValue, offset: ScalarValue, @@ -246,7 +246,7 @@ pub struct ExponentialDistribution { /// For a more in-depth discussion, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct GaussianDistribution { mean: ScalarValue, variance: ScalarValue, @@ -256,7 +256,7 @@ pub struct GaussianDistribution { /// the success probability is unknown. For a more in-depth discussion, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct BernoulliDistribution { p: ScalarValue, } @@ -265,7 +265,7 @@ pub struct BernoulliDistribution { /// approximated via some summary statistics. For a more in-depth discussion, see: /// /// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd)] pub struct GenericDistribution { mean: ScalarValue, median: ScalarValue, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 693adc6da03a..b23a00ad7960 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -585,6 +585,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .as_ref() .map(|v| v.try_into().map(Arc::new)) .transpose()?, + distribution_statistics: None, extensions: None, metadata_size_hint: None, }) diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index ecf465dd3f18..aab60971b88e 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -130,6 +130,7 @@ pub async fn from_substrait_rel( partition_values: vec![], range: None, statistics: None, + distribution_statistics: None, extensions: None, metadata_size_hint: None, };