Skip to content

Commit 0bd093b

Browse files
committed
Refactor cache APIs to support ordering information
1 parent e6049de commit 0bd093b

File tree

11 files changed

+939
-767
lines changed

11 files changed

+939
-767
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -831,14 +831,14 @@ impl TableFunctionImpl for ListFilesCacheFunc {
831831
.map(|t| t.duration_since(now).as_millis() as i64),
832832
);
833833

834-
for meta in entry.metas.iter() {
834+
for meta in entry.cached_file_list.files.iter() {
835835
file_path_arr.push(meta.location.to_string());
836836
file_modified_arr.push(meta.last_modified.timestamp_millis());
837837
file_size_bytes_arr.push(meta.size);
838838
etag_arr.push(meta.e_tag.clone());
839839
version_arr.push(meta.version.clone());
840840
}
841-
current_offset += entry.metas.len() as i32;
841+
current_offset += entry.cached_file_list.files.len() as i32;
842842
offsets.push(current_offset);
843843
}
844844
}

datafusion/catalog-listing/src/table.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -710,31 +710,42 @@ impl ListingTable {
710710
store: &Arc<dyn ObjectStore>,
711711
part_file: &PartitionedFile,
712712
) -> datafusion_common::Result<Arc<Statistics>> {
713-
match self
713+
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
714+
715+
// Check cache first
716+
if let Some(cached) = self
714717
.collected_statistics
715-
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
718+
.get(&part_file.object_meta.location)
716719
{
717-
Some(statistics) => Ok(statistics),
718-
None => {
719-
let statistics = self
720-
.options
721-
.format
722-
.infer_stats(
723-
ctx,
724-
store,
725-
Arc::clone(&self.file_schema),
726-
&part_file.object_meta,
727-
)
728-
.await?;
729-
let statistics = Arc::new(statistics);
730-
self.collected_statistics.put_with_extra(
731-
&part_file.object_meta.location,
732-
Arc::clone(&statistics),
733-
&part_file.object_meta,
734-
);
735-
Ok(statistics)
720+
// Validate that cached entry is still valid
721+
if cached.is_valid_for(&part_file.object_meta) {
722+
return Ok(cached.statistics);
736723
}
737724
}
725+
726+
// Cache miss or invalid - infer statistics
727+
let statistics = self
728+
.options
729+
.format
730+
.infer_stats(
731+
ctx,
732+
store,
733+
Arc::clone(&self.file_schema),
734+
&part_file.object_meta,
735+
)
736+
.await?;
737+
let statistics = Arc::new(statistics);
738+
739+
// Store in cache
740+
self.collected_statistics.put(
741+
&part_file.object_meta.location,
742+
CachedFileMetadata::new(
743+
part_file.object_meta.clone(),
744+
Arc::clone(&statistics),
745+
None, // No ordering information in this PR
746+
),
747+
);
748+
Ok(statistics)
738749
}
739750
}
740751

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion_common::stats::Precision;
3131
use datafusion_common::{
3232
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
3333
};
34-
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
34+
use datafusion_execution::cache::cache_manager::{
35+
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
36+
};
3537
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
3638
use datafusion_physical_plan::Accumulator;
3739
use log::debug;
@@ -125,19 +127,15 @@ impl<'a> DFParquetMetadata<'a> {
125127
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
126128

127129
if cache_metadata
128-
&& let Some(parquet_metadata) = file_metadata_cache
129-
.as_ref()
130-
.and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
131-
.and_then(|file_metadata| {
132-
file_metadata
133-
.as_any()
134-
.downcast_ref::<CachedParquetMetaData>()
135-
.map(|cached_parquet_metadata| {
136-
Arc::clone(cached_parquet_metadata.parquet_metadata())
137-
})
138-
})
130+
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
131+
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
132+
&& cached.is_valid_for(object_meta)
133+
&& let Some(cached_parquet) = cached
134+
.file_metadata
135+
.as_any()
136+
.downcast_ref::<CachedParquetMetaData>()
139137
{
140-
return Ok(parquet_metadata);
138+
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
141139
}
142140

143141
let mut reader =
@@ -163,8 +161,11 @@ impl<'a> DFParquetMetadata<'a> {
163161

164162
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
165163
file_metadata_cache.put(
166-
object_meta,
167-
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
164+
&object_meta.location,
165+
CachedFileMetadataEntry::new(
166+
(*object_meta).clone(),
167+
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
168+
),
168169
);
169170
}
170171

datafusion/datasource/src/url.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion_common::{DataFusionError, Result, TableReference};
2121
use datafusion_execution::cache::TableScopedPath;
22+
use datafusion_execution::cache::cache_manager::CachedFileList;
2223
use datafusion_execution::object_store::ObjectStoreUrl;
2324
use datafusion_session::Session;
2425

@@ -396,42 +397,34 @@ async fn list_with_cache<'b>(
396397
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
397398
.boxed()),
398399
Some(cache) => {
399-
// Convert prefix to Option<Path> for cache lookup
400-
let prefix_filter = prefix.cloned();
400+
// Build the filter prefix (only Some if prefix was requested)
401+
let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
401402

402403
let table_scoped_base_path = TableScopedPath {
403404
table: table_ref.cloned(),
404405
path: table_base_path.clone(),
405406
};
406407

407-
// Try cache lookup with optional prefix filter
408-
let vec = if let Some(res) =
409-
cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
410-
{
408+
// Try cache lookup - get returns CachedFileList
409+
let vec = if let Some(cached) = cache.get(&table_scoped_base_path) {
411410
debug!("Hit list files cache");
412-
res.as_ref().clone()
411+
cached.files_matching_prefix(&filter_prefix)
413412
} else {
414413
// Cache miss - always list and cache the full table
415414
// This ensures we have complete data for future prefix queries
416415
let vec = store
417416
.list(Some(table_base_path))
418417
.try_collect::<Vec<ObjectMeta>>()
419418
.await?;
420-
cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
421-
422-
// If a prefix filter was requested, apply it to the results
423-
if prefix.is_some() {
424-
let full_prefix_str = full_prefix.as_ref();
425-
vec.into_iter()
426-
.filter(|meta| {
427-
meta.location.as_ref().starts_with(full_prefix_str)
428-
})
429-
.collect()
430-
} else {
431-
vec
432-
}
419+
let cached: CachedFileList = vec.into();
420+
let result = cached.files_matching_prefix(&filter_prefix);
421+
cache.put(&table_scoped_base_path, cached);
422+
result
433423
};
434-
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
424+
Ok(
425+
futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
426+
.boxed(),
427+
)
435428
}
436429
}
437430
}
@@ -531,6 +524,7 @@ mod tests {
531524
use std::any::Any;
532525
use std::collections::HashMap;
533526
use std::ops::Range;
527+
use std::sync::Arc;
534528
use tempfile::tempdir;
535529

536530
#[test]

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ chrono = { workspace = true }
5555
dashmap = { workspace = true }
5656
datafusion-common = { workspace = true, default-features = false }
5757
datafusion-expr = { workspace = true, default-features = false }
58+
datafusion-physical-expr-common = { workspace = true, default-features = false }
5859
futures = { workspace = true }
5960
log = { workspace = true }
6061
object_store = { workspace = true, features = ["fs"] }

0 commit comments

Comments
 (0)