Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 274 additions & 40 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,103 @@ impl ListingTable {
self.options.format.file_source(table_schema)
}

/// If file_sort_order is specified, creates the appropriate physical expressions
/// Creates output ordering from user-specified file_sort_order or derives
/// from file orderings when user doesn't specify.
///
/// If user specified `file_sort_order`, that takes precedence.
/// Otherwise, attempts to derive common ordering from file orderings in
/// the provided file groups.
pub fn try_create_output_ordering(
&self,
execution_props: &ExecutionProps,
file_groups: &[FileGroup],
) -> datafusion_common::Result<Vec<LexOrdering>> {
create_lex_ordering(
&self.table_schema,
&self.options.file_sort_order,
execution_props,
)
// If user specified sort order, use that
if !self.options.file_sort_order.is_empty() {
return create_lex_ordering(
&self.table_schema,
&self.options.file_sort_order,
execution_props,
);
}
if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
return Ok(vec![ordering]);
}
Ok(vec![])
}
}

/// Derives a common ordering from file orderings across all file groups.
///
/// Returns the common ordering if all files have compatible orderings,
/// otherwise returns None.
///
/// The function finds the longest common prefix among all file orderings.
/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common
/// ordering is `[a, b]`.
fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
enum CurrentOrderingState {
/// Initial state before processing any files
FirstFile,
/// Some common ordering found so far
SomeOrdering(LexOrdering),
/// No files have ordering
NoOrdering,
}
let mut state = CurrentOrderingState::FirstFile;

// Collect file orderings and track counts
for group in file_groups {
for file in group.iter() {
state = match (&state, &file.ordering) {
// If this is the first file with ordering, set it as current
(CurrentOrderingState::FirstFile, Some(ordering)) => {
CurrentOrderingState::SomeOrdering(ordering.clone())
}
(CurrentOrderingState::FirstFile, None) => {
CurrentOrderingState::NoOrdering
}
// If we have an existing ordering, find common prefix with new ordering
(CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
// Find common prefix between current and new ordering
let prefix_len = current
.as_ref()
.iter()
.zip(ordering.as_ref().iter())
.take_while(|(a, b)| a == b)
.count();
if prefix_len == 0 {
log::trace!(
"Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
);
return None;
} else {
let ordering =
LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
.expect("prefix_len > 0, so ordering must be valid");
CurrentOrderingState::SomeOrdering(ordering)
}
}
// If one file has ordering and another doesn't, no common ordering
// Return None and log a trace message explaining why
(CurrentOrderingState::SomeOrdering(ordering), None)
| (CurrentOrderingState::NoOrdering, Some(ordering)) => {
log::trace!(
"Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
);
return None;
}
// Both have no ordering, remain in NoOrdering state
(CurrentOrderingState::NoOrdering, None) => {
CurrentOrderingState::NoOrdering
}
};
}
}

match state {
CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
_ => None,
}
}

Expand Down Expand Up @@ -439,7 +526,10 @@ impl TableProvider for ListingTable {
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}

let output_ordering = self.try_create_output_ordering(state.execution_props())?;
let output_ordering = self.try_create_output_ordering(
state.execution_props(),
&partitioned_file_lists,
)?;
match state
.config_options()
.execution
Expand Down Expand Up @@ -586,7 +676,8 @@ impl TableProvider for ListingTable {
file_extension: self.options().format.get_ext(),
};

let orderings = self.try_create_output_ordering(state.execution_props())?;
// For writes, we only use user-specified ordering (no file groups to derive from)
let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
// It is sufficient to pass only one of the equivalent orderings:
let order_requirements = orderings.into_iter().next().map(Into::into);

Expand Down Expand Up @@ -635,16 +726,19 @@ impl ListingTable {
let meta_fetch_concurrency =
ctx.config_options().execution.meta_fetch_concurrency;
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
// collect the statistics if required by the config
// collect the statistics and ordering if required by the config
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
self.do_collect_statistics(ctx, &store, &part_file).await?
let (statistics, ordering) = if self.options.collect_stat {
self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
.await?
} else {
Arc::new(Statistics::new_unknown(&self.file_schema))
(Arc::new(Statistics::new_unknown(&self.file_schema)), None)
};
Ok(part_file.with_statistics(statistics))
Ok(part_file
.with_statistics(statistics)
.with_ordering(ordering))
})
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
Expand Down Expand Up @@ -699,53 +793,50 @@ impl ListingTable {
})
}

/// Collects statistics for a given partitioned file.
/// Collects statistics and ordering for a given partitioned file.
///
/// This method first checks if the statistics for the given file are already cached.
/// If they are, it returns the cached statistics.
/// If they are not, it infers the statistics from the file and stores them in the cache.
async fn do_collect_statistics(
/// This method checks if statistics are cached. If cached, it returns the
/// cached statistics and infers ordering separately. If not cached, it infers
/// both statistics and ordering in a single metadata read for efficiency.
async fn do_collect_statistics_and_ordering(
&self,
ctx: &dyn Session,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> datafusion_common::Result<Arc<Statistics>> {
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
use datafusion_execution::cache::cache_manager::CachedFileMetadata;

// Check cache first
if let Some(cached) = self
.collected_statistics
.get(&part_file.object_meta.location)
let path = &part_file.object_meta.location;
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cached) = self.collected_statistics.get(path)
&& cached.is_valid_for(meta)
{
// Validate that cached entry is still valid
if cached.is_valid_for(&part_file.object_meta) {
return Ok(cached.statistics);
}
// Return cached statistics and ordering
return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
}

// Cache miss or invalid - infer statistics
let statistics = self
// Cache miss or invalid: fetch both statistics and ordering in a single metadata read
let file_meta = self
.options
.format
.infer_stats(
ctx,
store,
Arc::clone(&self.file_schema),
&part_file.object_meta,
)
.infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
.await?;
let statistics = Arc::new(statistics);

let statistics = Arc::new(file_meta.statistics);

// Store in cache
self.collected_statistics.put(
&part_file.object_meta.location,
path,
CachedFileMetadata::new(
part_file.object_meta.clone(),
meta.clone(),
Arc::clone(&statistics),
None, // No ordering information in this PR
file_meta.ordering.clone(),
),
);
Ok(statistics)

Ok((statistics, file_meta.ordering))
}
}

Expand Down Expand Up @@ -821,3 +912,146 @@ async fn get_files_with_limit(
let inexact_stats = all_files.next().await.is_some();
Ok((file_group, inexact_stats))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::compute::SortOptions;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use std::sync::Arc;

/// Helper to create a PhysicalSortExpr
fn sort_expr(
name: &str,
idx: usize,
descending: bool,
nulls_first: bool,
) -> PhysicalSortExpr {
PhysicalSortExpr::new(
Arc::new(Column::new(name, idx)),
SortOptions {
descending,
nulls_first,
},
)
}

/// Helper to create a LexOrdering (unwraps the Option)
fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
LexOrdering::new(exprs).expect("expected non-empty ordering")
}

/// Helper to create a PartitionedFile with optional ordering
fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
}

#[test]
fn test_derive_common_ordering_all_files_same_ordering() {
// All files have the same ordering -> returns that ordering
let ordering = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, true, false),
]);

let file_groups = vec![
FileGroup::new(vec![
create_file("f1.parquet", Some(ordering.clone())),
create_file("f2.parquet", Some(ordering.clone())),
]),
FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
];

let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, Some(ordering));
}

#[test]
fn test_derive_common_ordering_common_prefix() {
// Files have different orderings but share a common prefix
let ordering_abc = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, false, true),
sort_expr("c", 2, false, true),
]);
let ordering_ab = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, false, true),
]);

let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", Some(ordering_abc)),
create_file("f2.parquet", Some(ordering_ab.clone())),
])];

let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, Some(ordering_ab));
}

#[test]
fn test_derive_common_ordering_no_common_prefix() {
// Files have completely different orderings -> returns None
let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);

let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", Some(ordering_a)),
create_file("f2.parquet", Some(ordering_b)),
])];

let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}

#[test]
fn test_derive_common_ordering_mixed_with_none() {
// Some files have ordering, some don't -> returns None
let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);

let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", Some(ordering)),
create_file("f2.parquet", None),
])];

let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}

#[test]
fn test_derive_common_ordering_all_none() {
// No files have ordering -> returns None
let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", None),
create_file("f2.parquet", None),
])];

let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}

#[test]
fn test_derive_common_ordering_empty_groups() {
// Empty file groups -> returns None
let file_groups: Vec<FileGroup> = vec![];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}

#[test]
fn test_derive_common_ordering_single_file() {
// Single file with ordering -> returns that ordering
let ordering = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, true, false),
]);

let file_groups = vec![FileGroup::new(vec![create_file(
"f1.parquet",
Some(ordering.clone()),
)])];

let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, Some(ordering));
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ mod tests {
let table =
ListingTable::try_new(config.clone()).expect("Creating the table");
let ordering_result =
table.try_create_output_ordering(state.execution_props());
table.try_create_output_ordering(state.execution_props(), &[]);

match (expected_result, ordering_result) {
(Ok(expected), Ok(result)) => {
Expand Down
Loading