Skip to content

Commit a6c6682

Browse files
committed
Refactor cache APIs to support ordering information
1 parent 102caeb commit a6c6682

File tree

11 files changed

+915
-744
lines changed

11 files changed

+915
-744
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -811,14 +811,14 @@ impl TableFunctionImpl for ListFilesCacheFunc {
811811
.map(|t| t.duration_since(now).as_millis() as i64),
812812
);
813813

814-
for meta in entry.metas.iter() {
814+
for meta in entry.cached_file_list.files.iter() {
815815
file_path_arr.push(meta.location.to_string());
816816
file_modified_arr.push(meta.last_modified.timestamp_millis());
817817
file_size_bytes_arr.push(meta.size);
818818
etag_arr.push(meta.e_tag.clone());
819819
version_arr.push(meta.version.clone());
820820
}
821-
current_offset += entry.metas.len() as i32;
821+
current_offset += entry.cached_file_list.files.len() as i32;
822822
offsets.push(current_offset);
823823
}
824824
}

datafusion/catalog-listing/src/table.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -705,31 +705,42 @@ impl ListingTable {
705705
store: &Arc<dyn ObjectStore>,
706706
part_file: &PartitionedFile,
707707
) -> datafusion_common::Result<Arc<Statistics>> {
708-
match self
708+
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
709+
710+
// Check cache first
711+
if let Some(cached) = self
709712
.collected_statistics
710-
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
713+
.get(&part_file.object_meta.location)
711714
{
712-
Some(statistics) => Ok(statistics),
713-
None => {
714-
let statistics = self
715-
.options
716-
.format
717-
.infer_stats(
718-
ctx,
719-
store,
720-
Arc::clone(&self.file_schema),
721-
&part_file.object_meta,
722-
)
723-
.await?;
724-
let statistics = Arc::new(statistics);
725-
self.collected_statistics.put_with_extra(
726-
&part_file.object_meta.location,
727-
Arc::clone(&statistics),
728-
&part_file.object_meta,
729-
);
730-
Ok(statistics)
715+
// Validate that cached entry is still valid
716+
if cached.is_valid_for(&part_file.object_meta) {
717+
return Ok(cached.statistics);
731718
}
732719
}
720+
721+
// Cache miss or invalid - infer statistics
722+
let statistics = self
723+
.options
724+
.format
725+
.infer_stats(
726+
ctx,
727+
store,
728+
Arc::clone(&self.file_schema),
729+
&part_file.object_meta,
730+
)
731+
.await?;
732+
let statistics = Arc::new(statistics);
733+
734+
// Store in cache
735+
self.collected_statistics.put(
736+
&part_file.object_meta.location,
737+
CachedFileMetadata::new(
738+
part_file.object_meta.clone(),
739+
Arc::clone(&statistics),
740+
None, // No ordering information in this PR
741+
),
742+
);
743+
Ok(statistics)
733744
}
734745
}
735746

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion_common::stats::Precision;
3131
use datafusion_common::{
3232
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
3333
};
34-
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
34+
use datafusion_execution::cache::cache_manager::{
35+
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
36+
};
3537
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
3638
use datafusion_physical_plan::Accumulator;
3739
use log::debug;
@@ -125,19 +127,15 @@ impl<'a> DFParquetMetadata<'a> {
125127
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
126128

127129
if cache_metadata
128-
&& let Some(parquet_metadata) = file_metadata_cache
129-
.as_ref()
130-
.and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
131-
.and_then(|file_metadata| {
132-
file_metadata
133-
.as_any()
134-
.downcast_ref::<CachedParquetMetaData>()
135-
.map(|cached_parquet_metadata| {
136-
Arc::clone(cached_parquet_metadata.parquet_metadata())
137-
})
138-
})
130+
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
131+
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
132+
&& cached.is_valid_for(object_meta)
133+
&& let Some(cached_parquet) = cached
134+
.file_metadata
135+
.as_any()
136+
.downcast_ref::<CachedParquetMetaData>()
139137
{
140-
return Ok(parquet_metadata);
138+
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
141139
}
142140

143141
let mut reader =
@@ -163,8 +161,11 @@ impl<'a> DFParquetMetadata<'a> {
163161

164162
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
165163
file_metadata_cache.put(
166-
object_meta,
167-
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
164+
&object_meta.location,
165+
CachedFileMetadataEntry::new(
166+
(*object_meta).clone(),
167+
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
168+
),
168169
);
169170
}
170171

datafusion/datasource/src/url.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use datafusion_common::{DataFusionError, Result};
21+
use datafusion_execution::cache::cache_manager::CachedFileList;
2122
use datafusion_execution::object_store::ObjectStoreUrl;
2223
use datafusion_session::Session;
2324

@@ -364,37 +365,29 @@ async fn list_with_cache<'b>(
364365
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
365366
.boxed()),
366367
Some(cache) => {
367-
// Convert prefix to Option<Path> for cache lookup
368-
let prefix_filter = prefix.cloned();
368+
// Build the filter prefix (only Some if prefix was requested)
369+
let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
369370

370-
// Try cache lookup with optional prefix filter
371-
let vec = if let Some(res) =
372-
cache.get_with_extra(table_base_path, &prefix_filter)
373-
{
371+
// Try cache lookup
372+
let vec = if let Some(cached) = cache.get(table_base_path) {
374373
debug!("Hit list files cache");
375-
res.as_ref().clone()
374+
cached.files_matching_prefix(&filter_prefix)
376375
} else {
377376
// Cache miss - always list and cache the full table
378377
// This ensures we have complete data for future prefix queries
379378
let vec = store
380379
.list(Some(table_base_path))
381380
.try_collect::<Vec<ObjectMeta>>()
382381
.await?;
383-
cache.put(table_base_path, Arc::new(vec.clone()));
384-
385-
// If a prefix filter was requested, apply it to the results
386-
if prefix.is_some() {
387-
let full_prefix_str = full_prefix.as_ref();
388-
vec.into_iter()
389-
.filter(|meta| {
390-
meta.location.as_ref().starts_with(full_prefix_str)
391-
})
392-
.collect()
393-
} else {
394-
vec
395-
}
382+
let cached = CachedFileList::new(vec);
383+
let result = cached.files_matching_prefix(&filter_prefix);
384+
cache.put(table_base_path, cached);
385+
result
396386
};
397-
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
387+
Ok(
388+
futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
389+
.boxed(),
390+
)
398391
}
399392
}
400393
}
@@ -494,6 +487,7 @@ mod tests {
494487
use std::any::Any;
495488
use std::collections::HashMap;
496489
use std::ops::Range;
490+
use std::sync::Arc;
497491
use tempfile::tempdir;
498492

499493
#[test]

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ chrono = { workspace = true }
5555
dashmap = { workspace = true }
5656
datafusion-common = { workspace = true, default-features = false }
5757
datafusion-expr = { workspace = true, default-features = false }
58+
datafusion-physical-expr-common = { workspace = true, default-features = false }
5859
futures = { workspace = true }
5960
log = { workspace = true }
6061
object_store = { workspace = true, features = ["fs"] }

0 commit comments

Comments
 (0)