-
Notifications
You must be signed in to change notification settings - Fork 1.9k
infer parquet file order from metadata and use it to optimize scans #19433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@zhuqi-lucas you may be intersted in this |
|
Great work @adriangb, i will review it next week ! |
| // Check that all orderings are identical | ||
| let first = all_orderings[0]; | ||
| for ordering in &all_orderings[1..] { | ||
| if *ordering != first { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support more relax cases, for example to use similar to:
| pub fn ordering_satisfy( |
| SELECT 1372708800 + value AS t | ||
| FROM generate_series(0, 99999) | ||
| ORDER BY t | ||
| ORDER BY t + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this changes to t+1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it was picking up the ordering and causing even less data to be read 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll note I had to go with ORDER BY t * t in the end, even ORDER BY t + 1 is optimized away now!
|
|
||
| statement ok | ||
| DROP TABLE t; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems only test ASC NULLS FIRST. Better to add test cases for:
- DESC ordering
- Multi-column ordering
- Files with no ordering metadata
| } | ||
|
|
||
| /// Checks if `other` is a prefix of this `LexOrdering`. | ||
| pub fn is_prefix(&self, other: &LexOrdering) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meet some similar topic for the sort pushdown, and submitted a PR, i prefer to use eq_properties to do similar things, it will handle more cases after my try in my internal project.
The ordering_satisfy will handle more cases than prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave it a review - looks nice! I can sequence this behind your PR if needed.
|
Great work @adriangb i left some initial review comments, i will review again for more details. |
8fb624f to
1a52a45
Compare
|
@zhuqi-lucas ping to take a look at this when you get a chance. Thanks! |
zhuqi-lucas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @adriangb for the great work, left some comments and question.
| if let Some(ordering) = &file.ordering { | ||
| all_orderings.push(ordering); | ||
| } else { | ||
| // If any file has no ordering, we can't derive a common ordering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may add some debug or tracing log here:
| // If any file has no ordering, we can't derive a common ordering | |
| // If any file has no ordering, we can't derive a common ordering | |
| tracing!( | |
| "Cannot derive common ordering: file {} has different ordering ({:?}) than first file ({:?})", | |
| file.object_meta.location, ordering, first | |
| ); |
| CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> | ||
| { | ||
| /// Retrieves the information about the entries currently cached. | ||
| fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it useful that we can add the has_ordering info to the FileStatisticsCacheEntry?
FileStatisticsCacheEntry {
object_meta: object_meta.clone(),
num_rows: stats.num_rows,
num_columns: stats.column_statistics.len(),
table_size_bytes: stats.total_byte_size,
statistics_size_bytes: 0,
has_ordering, // NEW: indicates if ordering is cached
},
| // Check that all orderings are identical | ||
| let first = all_orderings[0]; | ||
| for ordering in &all_orderings[1..] { | ||
| if *ordering != first { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use ordering_satisfy?
| // may not be compatible with Parquet sorting columns (e.g. ordering on `random()`). | ||
| // So if we cannot create a Parquet sorting column from the ordering requirement, | ||
| // we skip setting sorting columns on the Parquet sink. | ||
| lex_ordering_to_sorting_columns(&ordering).ok() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point!
| /// Cached file orderings, keyed by file path. | ||
| /// Stored separately from statistics to maintain backwards compatibility | ||
| /// with the FileStatisticsCache trait interface. | ||
| orderings: DashMap<Path, (ObjectMeta, Option<LexOrdering>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we just use one map:
pub struct DefaultFileStatisticsCache {
// Store both stats and ordering together
cache: DashMap<Path, CacheEntry>,
}
struct CacheEntry {
meta: ObjectMeta,
statistics: Arc<Statistics>,
ordering: Option<LexOrdering>, // Initially None until fetched
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went down a rabbit hole of reworking the cache... there was a lot of weird stuff going on (made worse by this PR). Each value in the cache essentially comprised 3 parts:
- Statistics
- Ordering
- "Extra" i.e. cache invalidation metadata
I tried to unify it all into 1 value... but it's a decent amount of code churn. I'll split this PR up.
| // [a ASC, b DESC] is NOT a prefix of [a ASC, b ASC, c ASC] (mismatch in middle) | ||
| assert!(!ordering_abc.is_prefix(&ordering_ab)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some tests for file modification:
#[tokio::test]
async fn test_ordering_cache_invalidation_on_file_modification() {
let cache = DefaultFileStatisticsCache::default();
let path = Path::from("test.parquet");
// Cache with original metadata
let meta_v1 = ObjectMeta {
location: path.clone(),
last_modified: Utc.timestamp_nanos(1000),
size: 100,
e_tag: None,
version: None,
};
let ordering_v1 = Some(LexOrdering::new(vec![...]).unwrap());
cache.put_ordering(&path, ordering_v1.clone(), &meta_v1);
// Verify cached
assert_eq!(cache.get_ordering(&path, &meta_v1), Some(ordering_v1.clone()));
// File modified (size changed)
let meta_v2 = ObjectMeta {
last_modified: Utc.timestamp_nanos(2000),
size: 200, // ← changed
..meta_v1.clone()
};
// Should return None (cache miss due to size mismatch)
assert_eq!(cache.get_ordering(&path, &meta_v2), None);
// Cache new version
let ordering_v2 = Some(LexOrdering::new(vec![...]).unwrap());
cache.put_ordering(&path, ordering_v2.clone(), &meta_v2);
// Old metadata should still be invalid
assert_eq!(cache.get_ordering(&path, &meta_v1), None);
// New metadata should work
assert_eq!(cache.get_ordering(&path, &meta_v2), Some(ordering_v2));
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 25a5aa0
| let column_idx = sorting_col.column_idx as usize; | ||
|
|
||
| // Get the column path from the Parquet schema | ||
| // The column_idx in SortingColumn refers to leaf columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add test to nest type, it also can be sorted?
I am not sure if it's possible?
| let ordering = self | ||
| .options | ||
| .format | ||
| .infer_ordering(ctx, store, Arc::clone(&self.file_schema), meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we get the cached statistic, but we still need to compute the ordering info.
If it's possible we only have one entry for it, and cached it or not. And the cache including the computed ordering info.
|
FYI @adriangb, i will be out for next 3 days, feel free to merge. |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhuqi-lucas and @adriangb -- I took a quick skim of this and it looks like a great idea. Let me know if there is anything else that needs a review -- otherwise 🚀
| } | ||
| } | ||
|
|
||
| /// Creates a test Parquet file with sorting_columns metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize this was part of the parquet metadata, but it appears to have been that way for 13 years 👍
53aab8b to
bcc1db0
Compare
…uctor (#19596) ## Which issue does this PR close? Part of #19433 ## Rationale for this change In preparation for ordering inference from Parquet metadata, we need to be able to store per-file ordering information on `PartitionedFile`. This PR adds the necessary infrastructure. ## What changes are included in this PR? - Add `ordering: Option<LexOrdering>` field to `PartitionedFile` struct - Add `new_from_meta(ObjectMeta)` constructor for creating files from metadata (cleaner than manually constructing) - Add `with_ordering()` builder method to set ordering information - Add `with_partition_values()` builder method for consistency - Update all `PartitionedFile` constructors to initialize `ordering: None` - Update callsites in test_util, proto, and substrait to use `new_from_meta` ## Are these changes tested? Yes, existing tests pass. The new field is currently always `None` so no new tests are needed yet. Tests for ordering inference will come in a follow-up PR. ## Are there any user-facing changes? No user-facing API changes. The `ordering` field is public but users typically construct `PartitionedFile` via the provided constructors which handle this automatically. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <[email protected]> Co-authored-by: Copilot <[email protected]>
## Which issue does this PR close? Part of #19433 ## Rationale for this change In preparation for ordering inference from Parquet metadata, the cache system needs refactoring to: 1. Support storing ordering information alongside statistics 2. Simplify the `CacheAccessor` trait by removing the `Extra` associated type and `*_with_extra` methods 3. Move validation logic into typed wrapper structs with explicit `is_valid_for()` methods ## What changes are included in this PR? ### Simplify `CacheAccessor` trait **Before:** ```rust pub trait CacheAccessor<K, V>: Send + Sync { type Extra: Clone; fn get(&self, k: &K) -> Option<V>; fn get_with_extra(&self, k: &K, e: &Self::Extra) -> Option<V>; fn put(&self, key: &K, value: V) -> Option<V>; fn put_with_extra(&self, key: &K, value: V, e: &Self::Extra) -> Option<V>; // ... other methods } ``` **After:** ```rust pub trait CacheAccessor<K, V>: Send + Sync { fn get(&self, key: &K) -> Option<V>; fn put(&self, key: &K, value: V) -> Option<V>; // ... other methods (no Extra type, no *_with_extra methods) } ``` ### Introduce typed wrapper structs for cached values Instead of passing validation metadata separately via `Extra`, embed it in the cached value type: - **`CachedFileMetadata`** - contains `meta: ObjectMeta`, `statistics: Arc<Statistics>`, `ordering: Option<LexOrdering>` - **`CachedFileList`** - contains `files: Arc<Vec<ObjectMeta>>` with `filter_by_prefix()` helper - **`CachedFileMetadataEntry`** - contains `meta: ObjectMeta`, `file_metadata: Arc<dyn FileMetadata>` Each wrapper has an `is_valid_for(&ObjectMeta)` method that checks if the cached entry is still valid (size and last_modified match). ### New validation pattern The typical usage pattern changes from: ```rust // Before: validation hidden in get_with_extra if let Some(stats) = cache.get_with_extra(&path, &object_meta) { // use stats } ``` To: ```rust // After: explicit validation if let Some(cached) = cache.get(&path) { if cached.is_valid_for(&object_meta) { // use cached.statistics } } ``` ### Add ordering support - `CachedFileMetadata` has new `ordering: Option<LexOrdering>` field - `FileStatisticsCacheEntry` has new `has_ordering: bool` field for introspection ## Are these changes tested? Yes, existing cache tests pass plus new tests for ordering support. ## Are there any user-facing changes? Breaking change to cache traits. Users with custom cache implementations will need to: 1. Update `CacheAccessor` impl to remove `Extra` type and `*_with_extra` methods 2. Update cached value types to the new wrappers (`CachedFileMetadata`, etc.) 3. Update callsites to use the new validation pattern with `is_valid_for()` 🤖 Generated with [Claude Code](https://claude.com/claude-code)
## Which issue does this PR close? Part of #19433 ## Rationale for this change When writing data to a table created with `CREATE EXTERNAL TABLE ... WITH ORDER`, the sorting columns should be recorded in the Parquet file's row group metadata. This allows downstream readers to know the data is sorted and potentially skip sorting operations. ## What changes are included in this PR? - Add `sort_expr_to_sorting_column()` and `lex_ordering_to_sorting_columns()` functions in `metadata.rs` to convert DataFusion ordering to Parquet `SortingColumn` - Add `sorting_columns` field to `ParquetSink` with `with_sorting_columns()` builder method - Update `create_writer_physical_plan()` to pass order requirements to `ParquetSink` - Update `create_writer_props()` to set sorting columns on `WriterProperties` - Add test verifying `sorting_columns` metadata is written correctly ## Are these changes tested? Yes, added `test_create_table_with_order_writes_sorting_columns` that: 1. Creates an external table with `WITH ORDER (a ASC NULLS FIRST, b DESC NULLS LAST)` 2. Inserts data 3. Reads the Parquet file and verifies the `sorting_columns` metadata matches the expected order ## Are there any user-facing changes? No user-facing API changes. Parquet files written via `INSERT INTO` or `COPY` for tables with `WITH ORDER` will now contain `sorting_columns` metadata in the row group. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <[email protected]> Co-authored-by: Copilot <[email protected]>
bcc1db0 to
98aa1d8
Compare
|
@zhuqi-lucas I think this is ready for another review now, I've incorporated the changes from the broken out PRs, rebased on main and addressed your previous feedback. |
zhuqi-lucas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @adriangb, LGTM now!
This PR adds the ability for DataFusion to automatically infer the sort ordering of Parquet files from their embedded `sorting_columns` metadata. Key changes: - Add `FileMeta` struct and `infer_ordering`/`infer_stats_and_ordering` methods to `FileFormat` trait - Add `ordering_from_parquet_metadata` function to convert Parquet sorting_columns to LexOrdering - Implement `infer_ordering` in `ParquetFormat` - Add `derive_common_ordering_from_files` to find common ordering prefix across all files in a scan - Update `ListingTable` to derive output ordering from file orderings when user doesn't specify `file_sort_order` - Add `is_prefix` method to `LexOrdering` Co-Authored-By: Claude Opus 4.5 <[email protected]>
Co-authored-by: Qi Zhu <[email protected]>
25a5aa0 to
ebd3c52
Compare
The idea here is to use the metadata in parquet files to infer sort orders, thus it is not required for users to specify it manually.
This should probably be split into multiple PRs:
WITH ORDERPartitionedFileconstruction