diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 31fb029fd57bd..38456944075fc 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -337,16 +337,103 @@ impl ListingTable { self.options.format.file_source(table_schema) } - /// If file_sort_order is specified, creates the appropriate physical expressions + /// Creates output ordering from user-specified file_sort_order or derives + /// from file orderings when user doesn't specify. + /// + /// If user specified `file_sort_order`, that takes precedence. + /// Otherwise, attempts to derive common ordering from file orderings in + /// the provided file groups. pub fn try_create_output_ordering( &self, execution_props: &ExecutionProps, + file_groups: &[FileGroup], ) -> datafusion_common::Result> { - create_lex_ordering( - &self.table_schema, - &self.options.file_sort_order, - execution_props, - ) + // If user specified sort order, use that + if !self.options.file_sort_order.is_empty() { + return create_lex_ordering( + &self.table_schema, + &self.options.file_sort_order, + execution_props, + ); + } + if let Some(ordering) = derive_common_ordering_from_files(file_groups) { + return Ok(vec![ordering]); + } + Ok(vec![]) + } +} + +/// Derives a common ordering from file orderings across all file groups. +/// +/// Returns the common ordering if all files have compatible orderings, +/// otherwise returns None. +/// +/// The function finds the longest common prefix among all file orderings. +/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common +/// ordering is `[a, b]`. +fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option { + enum CurrentOrderingState { + /// Initial state before processing any files + FirstFile, + /// Some common ordering found so far + SomeOrdering(LexOrdering), + /// No files have ordering + NoOrdering, + } + let mut state = CurrentOrderingState::FirstFile; + + // Collect file orderings and track counts + for group in file_groups { + for file in group.iter() { + state = match (&state, &file.ordering) { + // If this is the first file with ordering, set it as current + (CurrentOrderingState::FirstFile, Some(ordering)) => { + CurrentOrderingState::SomeOrdering(ordering.clone()) + } + (CurrentOrderingState::FirstFile, None) => { + CurrentOrderingState::NoOrdering + } + // If we have an existing ordering, find common prefix with new ordering + (CurrentOrderingState::SomeOrdering(current), Some(ordering)) => { + // Find common prefix between current and new ordering + let prefix_len = current + .as_ref() + .iter() + .zip(ordering.as_ref().iter()) + .take_while(|(a, b)| a == b) + .count(); + if prefix_len == 0 { + log::trace!( + "Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}" + ); + return None; + } else { + let ordering = + LexOrdering::new(current.as_ref()[..prefix_len].to_vec()) + .expect("prefix_len > 0, so ordering must be valid"); + CurrentOrderingState::SomeOrdering(ordering) + } + } + // If one file has ordering and another doesn't, no common ordering + // Return None and log a trace message explaining why + (CurrentOrderingState::SomeOrdering(ordering), None) + | (CurrentOrderingState::NoOrdering, Some(ordering)) => { + log::trace!( + "Cannot derive common ordering: some files have ordering {ordering:?}, others don't" + ); + return None; + } + // Both have no ordering, remain in NoOrdering state + (CurrentOrderingState::NoOrdering, None) => { + CurrentOrderingState::NoOrdering + } + }; + } + } + + match state { + CurrentOrderingState::SomeOrdering(ordering) => Some(ordering), + _ => None, } } @@ -439,7 +526,10 @@ impl TableProvider for ListingTable { return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } - let output_ordering = self.try_create_output_ordering(state.execution_props())?; + let output_ordering = self.try_create_output_ordering( + state.execution_props(), + &partitioned_file_lists, + )?; match state .config_options() .execution @@ -586,7 +676,8 @@ impl TableProvider for ListingTable { file_extension: self.options().format.get_ext(), }; - let orderings = self.try_create_output_ordering(state.execution_props())?; + // For writes, we only use user-specified ordering (no file groups to derive from) + let orderings = self.try_create_output_ordering(state.execution_props(), &[])?; // It is sufficient to pass only one of the equivalent orderings: let order_requirements = orderings.into_iter().next().map(Into::into); @@ -635,16 +726,19 @@ impl ListingTable { let meta_fetch_concurrency = ctx.config_options().execution.meta_fetch_concurrency; let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); - // collect the statistics if required by the config + // collect the statistics and ordering if required by the config let files = file_list .map(|part_file| async { let part_file = part_file?; - let statistics = if self.options.collect_stat { - self.do_collect_statistics(ctx, &store, &part_file).await? + let (statistics, ordering) = if self.options.collect_stat { + self.do_collect_statistics_and_ordering(ctx, &store, &part_file) + .await? } else { - Arc::new(Statistics::new_unknown(&self.file_schema)) + (Arc::new(Statistics::new_unknown(&self.file_schema)), None) }; - Ok(part_file.with_statistics(statistics)) + Ok(part_file + .with_statistics(statistics) + .with_ordering(ordering)) }) .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); @@ -699,53 +793,50 @@ impl ListingTable { }) } - /// Collects statistics for a given partitioned file. + /// Collects statistics and ordering for a given partitioned file. /// - /// This method first checks if the statistics for the given file are already cached. - /// If they are, it returns the cached statistics. - /// If they are not, it infers the statistics from the file and stores them in the cache. - async fn do_collect_statistics( + /// This method checks if statistics are cached. If cached, it returns the + /// cached statistics and infers ordering separately. If not cached, it infers + /// both statistics and ordering in a single metadata read for efficiency. + async fn do_collect_statistics_and_ordering( &self, ctx: &dyn Session, store: &Arc, part_file: &PartitionedFile, - ) -> datafusion_common::Result> { + ) -> datafusion_common::Result<(Arc, Option)> { use datafusion_execution::cache::cache_manager::CachedFileMetadata; - // Check cache first - if let Some(cached) = self - .collected_statistics - .get(&part_file.object_meta.location) + let path = &part_file.object_meta.location; + let meta = &part_file.object_meta; + + // Check cache first - if we have valid cached statistics and ordering + if let Some(cached) = self.collected_statistics.get(path) + && cached.is_valid_for(meta) { - // Validate that cached entry is still valid - if cached.is_valid_for(&part_file.object_meta) { - return Ok(cached.statistics); - } + // Return cached statistics and ordering + return Ok((Arc::clone(&cached.statistics), cached.ordering.clone())); } - // Cache miss or invalid - infer statistics - let statistics = self + // Cache miss or invalid: fetch both statistics and ordering in a single metadata read + let file_meta = self .options .format - .infer_stats( - ctx, - store, - Arc::clone(&self.file_schema), - &part_file.object_meta, - ) + .infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta) .await?; - let statistics = Arc::new(statistics); + + let statistics = Arc::new(file_meta.statistics); // Store in cache self.collected_statistics.put( - &part_file.object_meta.location, + path, CachedFileMetadata::new( - part_file.object_meta.clone(), + meta.clone(), Arc::clone(&statistics), - None, // No ordering information in this PR + file_meta.ordering.clone(), ), ); - Ok(statistics) + + Ok((statistics, file_meta.ordering)) } } @@ -821,3 +912,146 @@ async fn get_files_with_limit( let inexact_stats = all_files.next().await.is_some(); Ok((file_group, inexact_stats)) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::compute::SortOptions; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use std::sync::Arc; + + /// Helper to create a PhysicalSortExpr + fn sort_expr( + name: &str, + idx: usize, + descending: bool, + nulls_first: bool, + ) -> PhysicalSortExpr { + PhysicalSortExpr::new( + Arc::new(Column::new(name, idx)), + SortOptions { + descending, + nulls_first, + }, + ) + } + + /// Helper to create a LexOrdering (unwraps the Option) + fn lex_ordering(exprs: Vec) -> LexOrdering { + LexOrdering::new(exprs).expect("expected non-empty ordering") + } + + /// Helper to create a PartitionedFile with optional ordering + fn create_file(name: &str, ordering: Option) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering) + } + + #[test] + fn test_derive_common_ordering_all_files_same_ordering() { + // All files have the same ordering -> returns that ordering + let ordering = lex_ordering(vec![ + sort_expr("a", 0, false, true), + sort_expr("b", 1, true, false), + ]); + + let file_groups = vec![ + FileGroup::new(vec![ + create_file("f1.parquet", Some(ordering.clone())), + create_file("f2.parquet", Some(ordering.clone())), + ]), + FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]), + ]; + + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, Some(ordering)); + } + + #[test] + fn test_derive_common_ordering_common_prefix() { + // Files have different orderings but share a common prefix + let ordering_abc = lex_ordering(vec![ + sort_expr("a", 0, false, true), + sort_expr("b", 1, false, true), + sort_expr("c", 2, false, true), + ]); + let ordering_ab = lex_ordering(vec![ + sort_expr("a", 0, false, true), + sort_expr("b", 1, false, true), + ]); + + let file_groups = vec![FileGroup::new(vec![ + create_file("f1.parquet", Some(ordering_abc)), + create_file("f2.parquet", Some(ordering_ab.clone())), + ])]; + + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, Some(ordering_ab)); + } + + #[test] + fn test_derive_common_ordering_no_common_prefix() { + // Files have completely different orderings -> returns None + let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]); + let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]); + + let file_groups = vec![FileGroup::new(vec![ + create_file("f1.parquet", Some(ordering_a)), + create_file("f2.parquet", Some(ordering_b)), + ])]; + + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, None); + } + + #[test] + fn test_derive_common_ordering_mixed_with_none() { + // Some files have ordering, some don't -> returns None + let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]); + + let file_groups = vec![FileGroup::new(vec![ + create_file("f1.parquet", Some(ordering)), + create_file("f2.parquet", None), + ])]; + + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, None); + } + + #[test] + fn test_derive_common_ordering_all_none() { + // No files have ordering -> returns None + let file_groups = vec![FileGroup::new(vec![ + create_file("f1.parquet", None), + create_file("f2.parquet", None), + ])]; + + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, None); + } + + #[test] + fn test_derive_common_ordering_empty_groups() { + // Empty file groups -> returns None + let file_groups: Vec = vec![]; + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, None); + } + + #[test] + fn test_derive_common_ordering_single_file() { + // Single file with ordering -> returns that ordering + let ordering = lex_ordering(vec![ + sort_expr("a", 0, false, true), + sort_expr("b", 1, true, false), + ]); + + let file_groups = vec![FileGroup::new(vec![create_file( + "f1.parquet", + Some(ordering.clone()), + )])]; + + let result = derive_common_ordering_from_files(&file_groups); + assert_eq!(result, Some(ordering)); + } +} diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 93d77e10ba23c..4e33f3cad51a4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -347,7 +347,7 @@ mod tests { let table = ListingTable::try_new(config.clone()).expect("Creating the table"); let ordering_result = - table.try_create_output_ordering(state.execution_props()); + table.try_create_output_ordering(state.execution_props(), &[]); match (expected_result, ordering_result) { (Ok(expected), Ok(result)) => { diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d12739658c400..f2d6607e3ca1b 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -1752,15 +1752,17 @@ STORED AS PARQUET; .unwrap(); // Create a TopK query that will use dynamic filter pushdown + // Note that we use t * t as the order by expression to avoid + // the order pushdown optimizer from optimizing away the TopK. let df = ctx - .sql(r"EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t LIMIT 10;") + .sql(r"EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t * t LIMIT 10;") .await .unwrap(); let batches = df.collect().await.unwrap(); let explain = format!("{}", pretty_format_batches(&batches).unwrap()); assert!(explain.contains("output_rows=128")); // Read 1 row group - assert!(explain.contains("t@0 < 1372708809")); // Dynamic filter was applied + assert!(explain.contains("t@0 < 1884329474306198481")); // Dynamic filter was applied assert!( explain.contains("pushdown_rows_matched=128, pushdown_rows_pruned=99.87 K"), "{explain}" diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 2109416d646fb..6635c9072dd97 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -449,6 +449,57 @@ impl FileFormat for ParquetFormat { .await } + async fn infer_ordering( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result> { + let file_decryption_properties = + get_file_decryption_properties(state, &self.options, &object.location) + .await?; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let metadata = DFParquetMetadata::new(store, object) + .with_metadata_size_hint(self.metadata_size_hint()) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await?; + crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema) + } + + async fn infer_stats_and_ordering( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result { + let file_decryption_properties = + get_file_decryption_properties(state, &self.options, &object.location) + .await?; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let metadata = DFParquetMetadata::new(store, object) + .with_metadata_size_hint(self.metadata_size_hint()) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await?; + let statistics = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &table_schema, + )?; + let ordering = + crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?; + Ok( + datafusion_datasource::file_format::FileMeta::new(statistics) + .with_ordering(ordering), + ) + } + async fn create_physical_plan( &self, state: &dyn Session, diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 7e768a7958a0f..b763f817a0268 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -658,6 +658,73 @@ pub(crate) fn lex_ordering_to_sorting_columns( ordering.iter().map(sort_expr_to_sorting_column).collect() } +/// Extracts ordering information from Parquet metadata. +/// +/// This function reads the sorting_columns from the first row group's metadata +/// and converts them into a [`LexOrdering`] that can be used by the query engine. +/// +/// # Arguments +/// * `metadata` - The Parquet metadata containing sorting_columns information +/// * `schema` - The Arrow schema to use for column lookup +/// +/// # Returns +/// * `Ok(Some(ordering))` if valid ordering information was found +/// * `Ok(None)` if no sorting columns were specified or they couldn't be resolved +pub fn ordering_from_parquet_metadata( + metadata: &ParquetMetaData, + schema: &SchemaRef, +) -> Result> { + // Get the sorting columns from the first row group metadata. + // If no row groups exist or no sorting columns are specified, return None. + let sorting_columns = metadata + .row_groups() + .first() + .and_then(|rg| rg.sorting_columns()) + .filter(|cols| !cols.is_empty()); + + let Some(sorting_columns) = sorting_columns else { + return Ok(None); + }; + + let parquet_schema = metadata.file_metadata().schema_descr(); + + let sort_exprs = + sorting_columns_to_physical_exprs(sorting_columns, parquet_schema, schema); + + if sort_exprs.is_empty() { + return Ok(None); + } + + Ok(LexOrdering::new(sort_exprs)) +} + +/// Converts Parquet sorting columns to physical sort expressions. +fn sorting_columns_to_physical_exprs( + sorting_columns: &[SortingColumn], + parquet_schema: &SchemaDescriptor, + arrow_schema: &SchemaRef, +) -> Vec { + use arrow::compute::SortOptions; + + sorting_columns + .iter() + .filter_map(|sc| { + let parquet_column = parquet_schema.column(sc.column_idx as usize); + let name = parquet_column.name(); + + // Find the column in the arrow schema + let (index, _) = arrow_schema.column_with_name(name)?; + + let expr = Arc::new(Column::new(name, index)); + let options = SortOptions { + descending: sc.descending, + nulls_first: sc.nulls_first, + }; + Some(PhysicalSortExpr::new(expr, options)) + }) + .collect() +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs index 54389ecd214e5..9f8fa622d2587 100644 --- a/datafusion/datasource/src/file_format.rs +++ b/datafusion/datasource/src/file_format.rs @@ -32,6 +32,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{GetExt, Result, Statistics, internal_err, not_impl_err}; use datafusion_physical_expr::LexRequirement; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; @@ -41,6 +42,35 @@ use object_store::{ObjectMeta, ObjectStore}; /// Default max records to scan to infer the schema pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; +/// Metadata fetched from a file, including statistics and ordering. +/// +/// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to +/// provide all metadata in a single read, avoiding duplicate I/O operations. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct FileMeta { + /// Statistics for the file (row counts, byte sizes, column statistics). + pub statistics: Statistics, + /// The ordering (sort order) of the file, if known. + pub ordering: Option, +} + +impl FileMeta { + /// Creates a new `FileMeta` with the given statistics and no ordering. + pub fn new(statistics: Statistics) -> Self { + Self { + statistics, + ordering: None, + } + } + + /// Sets the ordering for this file metadata. + pub fn with_ordering(mut self, ordering: Option) -> Self { + self.ordering = ordering; + self + } +} + /// This trait abstracts all the file format specific implementations /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the same file formats. @@ -90,6 +120,52 @@ pub trait FileFormat: Send + Sync + fmt::Debug { object: &ObjectMeta, ) -> Result; + /// Infer the ordering (sort order) for the provided object from file metadata. + /// + /// Returns `Ok(None)` if the file format does not support ordering inference + /// or if the file does not have ordering information. + /// + /// `table_schema` is the (combined) schema of the overall table + /// and may be a superset of the schema contained in this file. + /// + /// The default implementation returns `Ok(None)`. + async fn infer_ordering( + &self, + _state: &dyn Session, + _store: &Arc, + _table_schema: SchemaRef, + _object: &ObjectMeta, + ) -> Result> { + Ok(None) + } + + /// Infer both statistics and ordering from a single metadata read. + /// + /// This is more efficient than calling [`Self::infer_stats`] and + /// [`Self::infer_ordering`] separately when both are needed, as it avoids + /// reading file metadata twice. + /// + /// The default implementation calls both methods separately. File formats + /// that can extract both from a single read should override this method. + async fn infer_stats_and_ordering( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result { + let statistics = self + .infer_stats(state, store, Arc::clone(&table_schema), object) + .await?; + let ordering = self + .infer_ordering(state, store, table_schema, object) + .await?; + Ok(FileMeta { + statistics, + ordering, + }) + } + /// Take a list of files and convert it to the appropriate executor /// according to this file format. async fn create_physical_plan( diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 9362d078416ff..d98d23821ec7f 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -301,6 +301,68 @@ mod tests { assert_eq!(result.meta.size, 200); } + #[test] + fn test_ordering_cache_invalidation_on_file_modification() { + let cache = DefaultFileStatisticsCache::default(); + let path = Path::from("test.parquet"); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // Cache with original metadata and ordering + let meta_v1 = ObjectMeta { + location: path.clone(), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 100, + e_tag: None, + version: None, + }; + let ordering_v1 = ordering(); + let cached_v1 = CachedFileMetadata::new( + meta_v1.clone(), + Arc::new(Statistics::new_unknown(&schema)), + Some(ordering_v1), + ); + cache.put(&path, cached_v1); + + // Verify cached ordering is valid + let cached = cache.get(&path).unwrap(); + assert!(cached.is_valid_for(&meta_v1)); + assert!(cached.ordering.is_some()); + + // File modified (size changed) + let meta_v2 = ObjectMeta { + location: path.clone(), + last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00") + .unwrap() + .into(), + size: 200, // Changed + e_tag: None, + version: None, + }; + + // Cache entry exists but should be invalid for new metadata + let cached = cache.get(&path).unwrap(); + assert!(!cached.is_valid_for(&meta_v2)); + + // Cache new version with different ordering + let ordering_v2 = ordering(); // New ordering instance + let cached_v2 = CachedFileMetadata::new( + meta_v2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + Some(ordering_v2), + ); + cache.put(&path, cached_v2); + + // Old metadata should be invalid + let cached = cache.get(&path).unwrap(); + assert!(!cached.is_valid_for(&meta_v1)); + + // New metadata should be valid + assert!(cached.is_valid_for(&meta_v2)); + assert!(cached.ordering.is_some()); + } + #[test] fn test_list_entries() { let cache = DefaultFileStatisticsCache::default();