Skip to content

Commit 3697e42

Browse files
adriangbclaude
andcommitted
Refactor cache APIs to support ordering information
Refactor the cache system to support storing both statistics and ordering information together, in preparation for ordering inference from Parquet metadata. Changes to cache_manager.rs: - Add `CachedFileMetadata` struct with `meta`, `statistics`, and `ordering` fields - Refactor `FileStatisticsCache` trait to use `CachedFileMetadata` and Path keys - Add `has_ordering` field to `FileStatisticsCacheEntry` - Add `CachedFileList` for list files cache - Refactor `FileMetadataCache` trait to use `CachedFileMetadataEntry` and Path keys Changes to cache implementations: - Update `DefaultFileStatisticsCache` to use new trait methods - Update `DefaultFilesMetadataCache` to use new trait methods - Simplify list files cache implementation Changes to callsites: - Update `ListingTable::do_collect_statistics` to use new cache API - Update `DFParquetMetadata::fetch_metadata` to use new cache API - Update `ListingTableUrl` to use new cache API 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent cd12d51 commit 3697e42

File tree

10 files changed

+1007
-712
lines changed

10 files changed

+1007
-712
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/src/table.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -705,31 +705,42 @@ impl ListingTable {
705705
store: &Arc<dyn ObjectStore>,
706706
part_file: &PartitionedFile,
707707
) -> datafusion_common::Result<Arc<Statistics>> {
708-
match self
708+
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
709+
710+
// Check cache first
711+
if let Some(cached) = self
709712
.collected_statistics
710-
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
713+
.get(&part_file.object_meta.location)
711714
{
712-
Some(statistics) => Ok(statistics),
713-
None => {
714-
let statistics = self
715-
.options
716-
.format
717-
.infer_stats(
718-
ctx,
719-
store,
720-
Arc::clone(&self.file_schema),
721-
&part_file.object_meta,
722-
)
723-
.await?;
724-
let statistics = Arc::new(statistics);
725-
self.collected_statistics.put_with_extra(
726-
&part_file.object_meta.location,
727-
Arc::clone(&statistics),
728-
&part_file.object_meta,
729-
);
730-
Ok(statistics)
715+
// Validate that cached entry is still valid
716+
if cached.is_valid_for(&part_file.object_meta) {
717+
return Ok(cached.statistics);
731718
}
732719
}
720+
721+
// Cache miss or invalid - infer statistics
722+
let statistics = self
723+
.options
724+
.format
725+
.infer_stats(
726+
ctx,
727+
store,
728+
Arc::clone(&self.file_schema),
729+
&part_file.object_meta,
730+
)
731+
.await?;
732+
let statistics = Arc::new(statistics);
733+
734+
// Store in cache
735+
self.collected_statistics.put(
736+
&part_file.object_meta.location,
737+
CachedFileMetadata::new(
738+
part_file.object_meta.clone(),
739+
Arc::clone(&statistics),
740+
None, // No ordering information in this PR
741+
),
742+
);
743+
Ok(statistics)
733744
}
734745
}
735746

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,15 @@ impl<'a> DFParquetMetadata<'a> {
125125
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
126126

127127
if cache_metadata
128-
&& let Some(parquet_metadata) = file_metadata_cache
129-
.as_ref()
130-
.and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
131-
.and_then(|file_metadata| {
132-
file_metadata
133-
.as_any()
134-
.downcast_ref::<CachedParquetMetaData>()
135-
.map(|cached_parquet_metadata| {
136-
Arc::clone(cached_parquet_metadata.parquet_metadata())
137-
})
138-
})
128+
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
129+
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
130+
&& cached.is_valid_for(object_meta)
131+
&& let Some(cached_parquet) = cached
132+
.file_metadata
133+
.as_any()
134+
.downcast_ref::<CachedParquetMetaData>()
139135
{
140-
return Ok(parquet_metadata);
136+
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
141137
}
142138

143139
let mut reader =
@@ -162,9 +158,13 @@ impl<'a> DFParquetMetadata<'a> {
162158
);
163159

164160
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
161+
use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
165162
file_metadata_cache.put(
166-
object_meta,
167-
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
163+
&object_meta.location,
164+
CachedFileMetadataEntry::new(
165+
(*object_meta).clone(),
166+
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
167+
),
168168
);
169169
}
170170

datafusion/datasource/src/url.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
19-
2018
use datafusion_common::{DataFusionError, Result};
19+
use datafusion_execution::cache::cache_manager::CachedFileList;
2120
use datafusion_execution::object_store::ObjectStoreUrl;
2221
use datafusion_session::Session;
2322

@@ -364,23 +363,31 @@ async fn list_with_cache<'b>(
364363
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
365364
.boxed()),
366365
Some(cache) => {
367-
// Convert prefix to Option<Path> for cache lookup
368-
let prefix_filter = prefix.cloned();
369-
370-
// Try cache lookup with optional prefix filter
371-
let vec = if let Some(res) =
372-
cache.get_with_extra(table_base_path, &prefix_filter)
373-
{
366+
// Try cache lookup
367+
let vec = if let Some(cached) = cache.get(table_base_path) {
374368
debug!("Hit list files cache");
375-
res.as_ref().clone()
369+
// Cache hit - apply prefix filter if needed
370+
if prefix.is_some() {
371+
let full_prefix_str = full_prefix.as_ref();
372+
cached
373+
.files
374+
.iter()
375+
.filter(|meta| {
376+
meta.location.as_ref().starts_with(full_prefix_str)
377+
})
378+
.cloned()
379+
.collect()
380+
} else {
381+
cached.files.as_ref().clone()
382+
}
376383
} else {
377384
// Cache miss - always list and cache the full table
378385
// This ensures we have complete data for future prefix queries
379386
let vec = store
380387
.list(Some(table_base_path))
381388
.try_collect::<Vec<ObjectMeta>>()
382389
.await?;
383-
cache.put(table_base_path, Arc::new(vec.clone()));
390+
cache.put(table_base_path, CachedFileList::new(vec.clone()));
384391

385392
// If a prefix filter was requested, apply it to the results
386393
if prefix.is_some() {
@@ -494,6 +501,7 @@ mod tests {
494501
use std::any::Any;
495502
use std::collections::HashMap;
496503
use std::ops::Range;
504+
use std::sync::Arc;
497505
use tempfile::tempdir;
498506

499507
#[test]

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ chrono = { workspace = true }
5555
dashmap = { workspace = true }
5656
datafusion-common = { workspace = true, default-features = false }
5757
datafusion-expr = { workspace = true, default-features = false }
58+
datafusion-physical-expr-common = { workspace = true, default-features = false }
5859
futures = { workspace = true }
5960
log = { workspace = true }
6061
object_store = { workspace = true, features = ["fs"] }

0 commit comments

Comments
 (0)