Skip to content

Commit 98aa1d8

Browse files
adriangbclaude
andcommitted
Infer ordering from Parquet sorting_columns metadata
This PR adds the ability for DataFusion to automatically infer the sort ordering of Parquet files from their embedded `sorting_columns` metadata. Key changes: - Add `FileMeta` struct and `infer_ordering`/`infer_stats_and_ordering` methods to `FileFormat` trait - Add `ordering_from_parquet_metadata` function to convert Parquet sorting_columns to LexOrdering - Implement `infer_ordering` in `ParquetFormat` - Add `derive_common_ordering_from_files` to find common ordering prefix across all files in a scan - Update `ListingTable` to derive output ordering from file orderings when user doesn't specify `file_sort_order` - Add `is_prefix` method to `LexOrdering` Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent b9a3b9f commit 98aa1d8

File tree

5 files changed

+479
-40
lines changed

5 files changed

+479
-40
lines changed

datafusion/catalog-listing/src/table.rs

Lines changed: 274 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -337,16 +337,103 @@ impl ListingTable {
337337
self.options.format.file_source(table_schema)
338338
}
339339

340-
/// If file_sort_order is specified, creates the appropriate physical expressions
340+
/// Creates output ordering from user-specified file_sort_order or derives
341+
/// from file orderings when user doesn't specify.
342+
///
343+
/// If user specified `file_sort_order`, that takes precedence.
344+
/// Otherwise, attempts to derive common ordering from file orderings in
345+
/// the provided file groups.
341346
pub fn try_create_output_ordering(
342347
&self,
343348
execution_props: &ExecutionProps,
349+
file_groups: &[FileGroup],
344350
) -> datafusion_common::Result<Vec<LexOrdering>> {
345-
create_lex_ordering(
346-
&self.table_schema,
347-
&self.options.file_sort_order,
348-
execution_props,
349-
)
351+
// If user specified sort order, use that
352+
if !self.options.file_sort_order.is_empty() {
353+
return create_lex_ordering(
354+
&self.table_schema,
355+
&self.options.file_sort_order,
356+
execution_props,
357+
);
358+
}
359+
if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
360+
return Ok(vec![ordering]);
361+
}
362+
Ok(vec![])
363+
}
364+
}
365+
366+
/// Derives a common ordering from file orderings across all file groups.
367+
///
368+
/// Returns the common ordering if all files have compatible orderings,
369+
/// otherwise returns None.
370+
///
371+
/// The function finds the longest common prefix among all file orderings.
372+
/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common
373+
/// ordering is `[a, b]`.
374+
fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
375+
enum CurrentOrderingState {
376+
/// Initial state before processing any files
377+
FirstFile,
378+
/// Some common ordering found so far
379+
SomeOrdering(LexOrdering),
380+
/// No files have ordering
381+
NoOrdering,
382+
}
383+
let mut state = CurrentOrderingState::FirstFile;
384+
385+
// Collect file orderings and track counts
386+
for group in file_groups {
387+
for file in group.iter() {
388+
state = match (&state, &file.ordering) {
389+
// If this is the first file with ordering, set it as current
390+
(CurrentOrderingState::FirstFile, Some(ordering)) => {
391+
CurrentOrderingState::SomeOrdering(ordering.clone())
392+
}
393+
(CurrentOrderingState::FirstFile, None) => {
394+
CurrentOrderingState::NoOrdering
395+
}
396+
// If we have an existing ordering, find common prefix with new ordering
397+
(CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
398+
// Find common prefix between current and new ordering
399+
let prefix_len = current
400+
.as_ref()
401+
.iter()
402+
.zip(ordering.as_ref().iter())
403+
.take_while(|(a, b)| a == b)
404+
.count();
405+
if prefix_len == 0 {
406+
log::trace!(
407+
"Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
408+
);
409+
return None;
410+
} else {
411+
let ordering =
412+
LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
413+
.expect("prefix_len > 0, so ordering must be valid");
414+
CurrentOrderingState::SomeOrdering(ordering)
415+
}
416+
}
417+
// If one file has ordering and another doesn't, no common ordering
418+
// Return None and log a trace message explaining why
419+
(CurrentOrderingState::SomeOrdering(ordering), None)
420+
| (CurrentOrderingState::NoOrdering, Some(ordering)) => {
421+
log::trace!(
422+
"Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
423+
);
424+
return None;
425+
}
426+
// Both have no ordering, remain in NoOrdering state
427+
(CurrentOrderingState::NoOrdering, None) => {
428+
CurrentOrderingState::NoOrdering
429+
}
430+
};
431+
}
432+
}
433+
434+
match state {
435+
CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
436+
_ => None,
350437
}
351438
}
352439

@@ -439,7 +526,10 @@ impl TableProvider for ListingTable {
439526
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
440527
}
441528

442-
let output_ordering = self.try_create_output_ordering(state.execution_props())?;
529+
let output_ordering = self.try_create_output_ordering(
530+
state.execution_props(),
531+
&partitioned_file_lists,
532+
)?;
443533
match state
444534
.config_options()
445535
.execution
@@ -586,7 +676,8 @@ impl TableProvider for ListingTable {
586676
file_extension: self.options().format.get_ext(),
587677
};
588678

589-
let orderings = self.try_create_output_ordering(state.execution_props())?;
679+
// For writes, we only use user-specified ordering (no file groups to derive from)
680+
let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
590681
// It is sufficient to pass only one of the equivalent orderings:
591682
let order_requirements = orderings.into_iter().next().map(Into::into);
592683

@@ -635,16 +726,19 @@ impl ListingTable {
635726
let meta_fetch_concurrency =
636727
ctx.config_options().execution.meta_fetch_concurrency;
637728
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
638-
// collect the statistics if required by the config
729+
// collect the statistics and ordering if required by the config
639730
let files = file_list
640731
.map(|part_file| async {
641732
let part_file = part_file?;
642-
let statistics = if self.options.collect_stat {
643-
self.do_collect_statistics(ctx, &store, &part_file).await?
733+
let (statistics, ordering) = if self.options.collect_stat {
734+
self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
735+
.await?
644736
} else {
645-
Arc::new(Statistics::new_unknown(&self.file_schema))
737+
(Arc::new(Statistics::new_unknown(&self.file_schema)), None)
646738
};
647-
Ok(part_file.with_statistics(statistics))
739+
Ok(part_file
740+
.with_statistics(statistics)
741+
.with_ordering(ordering))
648742
})
649743
.boxed()
650744
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
@@ -699,53 +793,50 @@ impl ListingTable {
699793
})
700794
}
701795

702-
/// Collects statistics for a given partitioned file.
796+
/// Collects statistics and ordering for a given partitioned file.
703797
///
704-
/// This method first checks if the statistics for the given file are already cached.
705-
/// If they are, it returns the cached statistics.
706-
/// If they are not, it infers the statistics from the file and stores them in the cache.
707-
async fn do_collect_statistics(
798+
/// This method checks if statistics are cached. If cached, it returns the
799+
/// cached statistics and infers ordering separately. If not cached, it infers
800+
/// both statistics and ordering in a single metadata read for efficiency.
801+
async fn do_collect_statistics_and_ordering(
708802
&self,
709803
ctx: &dyn Session,
710804
store: &Arc<dyn ObjectStore>,
711805
part_file: &PartitionedFile,
712-
) -> datafusion_common::Result<Arc<Statistics>> {
806+
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
713807
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
714808

715-
// Check cache first
716-
if let Some(cached) = self
717-
.collected_statistics
718-
.get(&part_file.object_meta.location)
809+
let path = &part_file.object_meta.location;
810+
let meta = &part_file.object_meta;
811+
812+
// Check cache first - if we have valid cached statistics and ordering
813+
if let Some(cached) = self.collected_statistics.get(path)
814+
&& cached.is_valid_for(meta)
719815
{
720-
// Validate that cached entry is still valid
721-
if cached.is_valid_for(&part_file.object_meta) {
722-
return Ok(cached.statistics);
723-
}
816+
// Return cached statistics and ordering
817+
return Ok((cached.statistics.clone(), cached.ordering.clone()));
724818
}
725819

726-
// Cache miss or invalid - infer statistics
727-
let statistics = self
820+
// Cache miss or invalid: fetch both statistics and ordering in a single metadata read
821+
let file_meta = self
728822
.options
729823
.format
730-
.infer_stats(
731-
ctx,
732-
store,
733-
Arc::clone(&self.file_schema),
734-
&part_file.object_meta,
735-
)
824+
.infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
736825
.await?;
737-
let statistics = Arc::new(statistics);
826+
827+
let statistics = Arc::new(file_meta.statistics);
738828

739829
// Store in cache
740830
self.collected_statistics.put(
741-
&part_file.object_meta.location,
831+
path,
742832
CachedFileMetadata::new(
743-
part_file.object_meta.clone(),
833+
meta.clone(),
744834
Arc::clone(&statistics),
745-
None, // No ordering information in this PR
835+
file_meta.ordering.clone(),
746836
),
747837
);
748-
Ok(statistics)
838+
839+
Ok((statistics, file_meta.ordering))
749840
}
750841
}
751842

@@ -821,3 +912,146 @@ async fn get_files_with_limit(
821912
let inexact_stats = all_files.next().await.is_some();
822913
Ok((file_group, inexact_stats))
823914
}
915+
916+
#[cfg(test)]
917+
mod tests {
918+
use super::*;
919+
use arrow::compute::SortOptions;
920+
use datafusion_physical_expr::expressions::Column;
921+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
922+
use std::sync::Arc;
923+
924+
/// Helper to create a PhysicalSortExpr
925+
fn sort_expr(
926+
name: &str,
927+
idx: usize,
928+
descending: bool,
929+
nulls_first: bool,
930+
) -> PhysicalSortExpr {
931+
PhysicalSortExpr::new(
932+
Arc::new(Column::new(name, idx)),
933+
SortOptions {
934+
descending,
935+
nulls_first,
936+
},
937+
)
938+
}
939+
940+
/// Helper to create a LexOrdering (unwraps the Option)
941+
fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
942+
LexOrdering::new(exprs).expect("expected non-empty ordering")
943+
}
944+
945+
/// Helper to create a PartitionedFile with optional ordering
946+
fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
947+
PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
948+
}
949+
950+
#[test]
951+
fn test_derive_common_ordering_all_files_same_ordering() {
952+
// All files have the same ordering -> returns that ordering
953+
let ordering = lex_ordering(vec![
954+
sort_expr("a", 0, false, true),
955+
sort_expr("b", 1, true, false),
956+
]);
957+
958+
let file_groups = vec![
959+
FileGroup::new(vec![
960+
create_file("f1.parquet", Some(ordering.clone())),
961+
create_file("f2.parquet", Some(ordering.clone())),
962+
]),
963+
FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
964+
];
965+
966+
let result = derive_common_ordering_from_files(&file_groups);
967+
assert_eq!(result, Some(ordering));
968+
}
969+
970+
#[test]
971+
fn test_derive_common_ordering_common_prefix() {
972+
// Files have different orderings but share a common prefix
973+
let ordering_abc = lex_ordering(vec![
974+
sort_expr("a", 0, false, true),
975+
sort_expr("b", 1, false, true),
976+
sort_expr("c", 2, false, true),
977+
]);
978+
let ordering_ab = lex_ordering(vec![
979+
sort_expr("a", 0, false, true),
980+
sort_expr("b", 1, false, true),
981+
]);
982+
983+
let file_groups = vec![FileGroup::new(vec![
984+
create_file("f1.parquet", Some(ordering_abc)),
985+
create_file("f2.parquet", Some(ordering_ab.clone())),
986+
])];
987+
988+
let result = derive_common_ordering_from_files(&file_groups);
989+
assert_eq!(result, Some(ordering_ab));
990+
}
991+
992+
#[test]
993+
fn test_derive_common_ordering_no_common_prefix() {
994+
// Files have completely different orderings -> returns None
995+
let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
996+
let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
997+
998+
let file_groups = vec![FileGroup::new(vec![
999+
create_file("f1.parquet", Some(ordering_a)),
1000+
create_file("f2.parquet", Some(ordering_b)),
1001+
])];
1002+
1003+
let result = derive_common_ordering_from_files(&file_groups);
1004+
assert_eq!(result, None);
1005+
}
1006+
1007+
#[test]
1008+
fn test_derive_common_ordering_mixed_with_none() {
1009+
// Some files have ordering, some don't -> returns None
1010+
let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
1011+
1012+
let file_groups = vec![FileGroup::new(vec![
1013+
create_file("f1.parquet", Some(ordering)),
1014+
create_file("f2.parquet", None),
1015+
])];
1016+
1017+
let result = derive_common_ordering_from_files(&file_groups);
1018+
assert_eq!(result, None);
1019+
}
1020+
1021+
#[test]
1022+
fn test_derive_common_ordering_all_none() {
1023+
// No files have ordering -> returns None
1024+
let file_groups = vec![FileGroup::new(vec![
1025+
create_file("f1.parquet", None),
1026+
create_file("f2.parquet", None),
1027+
])];
1028+
1029+
let result = derive_common_ordering_from_files(&file_groups);
1030+
assert_eq!(result, None);
1031+
}
1032+
1033+
#[test]
1034+
fn test_derive_common_ordering_empty_groups() {
1035+
// Empty file groups -> returns None
1036+
let file_groups: Vec<FileGroup> = vec![];
1037+
let result = derive_common_ordering_from_files(&file_groups);
1038+
assert_eq!(result, None);
1039+
}
1040+
1041+
#[test]
1042+
fn test_derive_common_ordering_single_file() {
1043+
// Single file with ordering -> returns that ordering
1044+
let ordering = lex_ordering(vec![
1045+
sort_expr("a", 0, false, true),
1046+
sort_expr("b", 1, true, false),
1047+
]);
1048+
1049+
let file_groups = vec![FileGroup::new(vec![create_file(
1050+
"f1.parquet",
1051+
Some(ordering.clone()),
1052+
)])];
1053+
1054+
let result = derive_common_ordering_from_files(&file_groups);
1055+
assert_eq!(result, Some(ordering));
1056+
}
1057+
}

0 commit comments

Comments
 (0)