Skip to content

Commit 1037f0a

Browse files
authored
feat: add list_files_cache table function for datafusion-cli (#19388)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19055. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ``` > CREATE EXTERNAL TABLE nyc_taxi_rides STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/' ; 0 row(s) fetched. Elapsed 10.061 seconds. > SELECT metadata_size_bytes, expires_in, unnest(metadata_list) FROM list_files_cache(); +---------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | metadata_size_bytes | expires_in | UNNEST(list_files_cache().metadata_list) | +---------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200901.parquet, file_modified: 2025-05-30T09:44:23, file_size_bytes: 222192983, e_tag: "e8d016c3c7af80bf911d96387febe2c1-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200902.parquet, file_modified: 2025-05-30T09:46:00, file_size_bytes: 211023080, e_tag: "1021626ff5ef606422aa7121edd69f3b-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200903.parquet, file_modified: 2025-05-30T09:47:20, file_size_bytes: 229202874, e_tag: "96e7494b217099c6a07e9c4298cbe783-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200904.parquet, file_modified: 2025-05-30T09:44:37, file_size_bytes: 225659965, e_tag: "728c45fabdcd8e40bdef4dfc28df9b0f-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200905.parquet, file_modified: 2025-05-30T09:46:12, file_size_bytes: 232847306, e_tag: "f59e45bd8bd1d77cd7ae8ab6ab468bcc-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200906.parquet, file_modified: 2025-05-30T09:47:26, file_size_bytes: 224226575, e_tag: "8ebb698eea85f9af87065ac333efc449-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200907.parquet, file_modified: 2025-05-30T09:44:52, file_size_bytes: 217168413, e_tag: "7d7ee77f6cac4adc18aa3a9e74600dd3-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200908.parquet, file_modified: 2025-05-30T09:46:23, file_size_bytes: 217303109, e_tag: "e9883055d92a33b941aab971423e681b-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200909.parquet, file_modified: 2025-05-30T09:47:28, file_size_bytes: 223333499, e_tag: "6f0917e6003b38df9060d71c004eb961-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200910.parquet, file_modified: 2025-05-30T09:44:54, file_size_bytes: 246300471, e_tag: "8928b29da44e041021e10077683b7817-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200911.parquet, file_modified: 2025-05-30T09:46:37, file_size_bytes: 227920860, e_tag: "4cd26a1a7f82af080c33e890dc1fef27-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200912.parquet, file_modified: 2025-05-30T09:44:24, file_size_bytes: 233873308, e_tag: "23f4584e494e3c065c777c270c9eedbc-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201001.parquet, file_modified: 2025-05-30T09:45:18, file_size_bytes: 235166925, e_tag: "effcc8cc41b40cf7ac466f911d7b9459-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201002.parquet, file_modified: 2025-05-30T09:46:59, file_size_bytes: 177367931, e_tag: "ce8b7817ecc47da86ccbfa6b51ffa06b-10", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201003.parquet, file_modified: 2025-05-30T09:44:26, file_size_bytes: 205857224, e_tag: "94a078b61e3b652387e6f2a673dc3f4e-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201004.parquet, file_modified: 2025-05-30T09:45:04, file_size_bytes: 243024246, e_tag: "a1efbebfdabc204e0041d8714aaec01a-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201005.parquet, file_modified: 2025-05-30T09:46:47, file_size_bytes: 248130090, e_tag: "d3cf585e00ce627a807348c84a42d0a6-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201006.parquet, file_modified: 2025-05-30T09:44:25, file_size_bytes: 237068130, e_tag: "831db33281a5c017f8ffc466bd47546b-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201007.parquet, file_modified: 2025-05-30T09:45:35, file_size_bytes: 234826090, e_tag: "790e05983e6592e4920c88fbd2bfe774-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201008.parquet, file_modified: 2025-05-30T09:47:14, file_size_bytes: 197990272, e_tag: "d87ddb446e5cbc0f6831fafd95cfd027-11", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201009.parquet, file_modified: 2025-05-30T09:44:27, file_size_bytes: 243408943, e_tag: "abfbe3b29942bcd68d131d95540278d3-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201010.parquet, file_modified: 2025-05-30T09:45:47, file_size_bytes: 225277041, e_tag: "f768c7b77497b2bf3efd5cb2a4362977-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201011.parquet, file_modified: 2025-05-30T09:47:23, file_size_bytes: 220010577, e_tag: "c6830cbe1f3ae918f9280db3aa847b03-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201012.parquet, file_modified: 2025-05-30T09:44:24, file_size_bytes: 219773352, e_tag: "264f7ea433076690a3bbe5566168e5c5-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201101.parquet, file_modified: 2025-05-30T09:45:52, file_size_bytes: 212535107, e_tag: "ca3bdc2707b29667c78c39517781eac4-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201102.parquet, file_modified: 2025-05-30T09:47:23, file_size_bytes: 223138164, e_tag: "e2b3c0fd0c0d66ac6363600de0c8b2ad-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201103.parquet, file_modified: 2025-05-30T09:44:26, file_size_bytes: 252843261, e_tag: "fd5d4e01568cd6e7ef1e00de76441e5b-15", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201104.parquet, file_modified: 2025-05-30T09:46:10, file_size_bytes: 233123935, e_tag: "2b510cc2c0c73d9ec7374c9e6d56c388-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201105.parquet, file_modified: 2025-05-30T09:44:24, file_size_bytes: 246843111, e_tag: "abc2f58bd520b2013aa1a333d317c70c-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201106.parquet, file_modified: 2025-05-30T09:44:58, file_size_bytes: 238786647, e_tag: "0e456698dc42a850ff7b764506cb511d-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201107.parquet, file_modified: 2025-05-30T09:46:40, file_size_bytes: 233249259, e_tag: "28177227cbff94a6a819a0568a14e9b2-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201108.parquet, file_modified: 2025-05-30T09:44:25, file_size_bytes: 212681184, e_tag: "fdcb442e1010630c0553a7018762a8ba-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201109.parquet, file_modified: 2025-05-30T09:45:13, file_size_bytes: 232399266, e_tag: "ccca37be5a3579a8bc644490226ed29a-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201110.parquet, file_modified: 2025-05-30T09:46:52, file_size_bytes: 248471033, e_tag: "eebe34c1bb74f63433eb607810969553-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201111.parquet, file_modified: 2025-05-30T09:44:26, file_size_bytes: 231103826, e_tag: "7c76b9fc111462b76336d63bce3253c7-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201112.parquet, file_modified: 2025-05-30T09:45:40, file_size_bytes: 236102882, e_tag: "26c10d1d85c4565cbb9e8fc6a7bc745c-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201201.parquet, file_modified: 2025-05-30T09:47:21, file_size_bytes: 236184052, e_tag: "8cdc15a22462579dcf90d669cea0f04b-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201202.parquet, file_modified: 2025-05-30T09:44:27, file_size_bytes: 238377570, e_tag: "4e6734c5c2e77c68dde5155a45dac81c-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201203.parquet, file_modified: 2025-05-30T09:46:06, file_size_bytes: 258226172, e_tag: "b7b07fa0f4fefcf0ba0fc69ba344b5c8-15", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201204.parquet, file_modified: 2025-05-30T09:44:25, file_size_bytes: 248190698, e_tag: "968c13850fa9a7cb46337bc8fc9d13fa-14", version: NULL} | | . | | . | | . | +---------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 96 row(s) fetched. (First 40 displayed. Use --maxrows to adjust) Elapsed 0.057 seconds. ``` ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> This will enable a new user-facing table function to datafusion cli. <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent c3e1c36 commit 1037f0a

File tree

5 files changed

+446
-31
lines changed

5 files changed

+446
-31
lines changed

datafusion-cli/src/functions.rs

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
//! Functions that are query-able and searchable via the `\h` command
1919
20+
use datafusion_common::instant::Instant;
2021
use std::fmt;
2122
use std::fs::File;
2223
use std::str::FromStr;
2324
use std::sync::Arc;
2425

25-
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
26+
use arrow::array::{
27+
DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
28+
TimestampMillisecondArray, UInt64Array,
29+
};
30+
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
31+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
2732
use arrow::record_batch::RecordBatch;
2833
use arrow::util::pretty::pretty_format_batches;
2934
use datafusion::catalog::{Session, TableFunctionImpl};
@@ -697,3 +702,158 @@ impl TableFunctionImpl for StatisticsCacheFunc {
697702
Ok(Arc::new(statistics_cache))
698703
}
699704
}
705+
706+
#[derive(Debug)]
707+
struct ListFilesCacheTable {
708+
schema: SchemaRef,
709+
batch: RecordBatch,
710+
}
711+
712+
#[async_trait]
713+
impl TableProvider for ListFilesCacheTable {
714+
fn as_any(&self) -> &dyn std::any::Any {
715+
self
716+
}
717+
718+
fn schema(&self) -> arrow::datatypes::SchemaRef {
719+
self.schema.clone()
720+
}
721+
722+
fn table_type(&self) -> datafusion::logical_expr::TableType {
723+
datafusion::logical_expr::TableType::Base
724+
}
725+
726+
async fn scan(
727+
&self,
728+
_state: &dyn Session,
729+
projection: Option<&Vec<usize>>,
730+
_filters: &[Expr],
731+
_limit: Option<usize>,
732+
) -> Result<Arc<dyn ExecutionPlan>> {
733+
Ok(MemorySourceConfig::try_new_exec(
734+
&[vec![self.batch.clone()]],
735+
TableProvider::schema(self),
736+
projection.cloned(),
737+
)?)
738+
}
739+
}
740+
741+
#[derive(Debug)]
742+
pub struct ListFilesCacheFunc {
743+
cache_manager: Arc<CacheManager>,
744+
}
745+
746+
impl ListFilesCacheFunc {
747+
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
748+
Self { cache_manager }
749+
}
750+
}
751+
752+
impl TableFunctionImpl for ListFilesCacheFunc {
753+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
754+
if !exprs.is_empty() {
755+
return plan_err!("list_files_cache should have no arguments");
756+
}
757+
758+
let nested_fields = Fields::from(vec![
759+
Field::new("file_path", DataType::Utf8, false),
760+
Field::new(
761+
"file_modified",
762+
DataType::Timestamp(TimeUnit::Millisecond, None),
763+
false,
764+
),
765+
Field::new("file_size_bytes", DataType::UInt64, false),
766+
Field::new("e_tag", DataType::Utf8, true),
767+
Field::new("version", DataType::Utf8, true),
768+
]);
769+
770+
let metadata_field =
771+
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
772+
773+
let schema = Arc::new(Schema::new(vec![
774+
Field::new("path", DataType::Utf8, false),
775+
Field::new("metadata_size_bytes", DataType::UInt64, false),
776+
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
777+
Field::new(
778+
"expires_in",
779+
DataType::Duration(TimeUnit::Millisecond),
780+
true,
781+
),
782+
Field::new(
783+
"metadata_list",
784+
DataType::List(Arc::new(metadata_field.clone())),
785+
true,
786+
),
787+
]));
788+
789+
let mut path_arr = vec![];
790+
let mut metadata_size_bytes_arr = vec![];
791+
let mut expires_arr = vec![];
792+
793+
let mut file_path_arr = vec![];
794+
let mut file_modified_arr = vec![];
795+
let mut file_size_bytes_arr = vec![];
796+
let mut etag_arr = vec![];
797+
let mut version_arr = vec![];
798+
let mut offsets: Vec<i32> = vec![0];
799+
800+
if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
801+
let now = Instant::now();
802+
let mut current_offset: i32 = 0;
803+
804+
for (path, entry) in list_files_cache.list_entries() {
805+
path_arr.push(path.to_string());
806+
metadata_size_bytes_arr.push(entry.size_bytes as u64);
807+
// calculates time left before entry expires
808+
expires_arr.push(
809+
entry
810+
.expires
811+
.map(|t| t.duration_since(now).as_millis() as i64),
812+
);
813+
814+
for meta in entry.metas.iter() {
815+
file_path_arr.push(meta.location.to_string());
816+
file_modified_arr.push(meta.last_modified.timestamp_millis());
817+
file_size_bytes_arr.push(meta.size);
818+
etag_arr.push(meta.e_tag.clone());
819+
version_arr.push(meta.version.clone());
820+
}
821+
current_offset += entry.metas.len() as i32;
822+
offsets.push(current_offset);
823+
}
824+
}
825+
826+
let struct_arr = StructArray::new(
827+
nested_fields,
828+
vec![
829+
Arc::new(StringArray::from(file_path_arr)),
830+
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
831+
Arc::new(UInt64Array::from(file_size_bytes_arr)),
832+
Arc::new(StringArray::from(etag_arr)),
833+
Arc::new(StringArray::from(version_arr)),
834+
],
835+
None,
836+
);
837+
838+
let offsets_buffer: OffsetBuffer<i32> =
839+
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
840+
841+
let batch = RecordBatch::try_new(
842+
schema.clone(),
843+
vec![
844+
Arc::new(StringArray::from(path_arr)),
845+
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
846+
Arc::new(DurationMillisecondArray::from(expires_arr)),
847+
Arc::new(GenericListArray::new(
848+
Arc::new(metadata_field),
849+
offsets_buffer,
850+
Arc::new(struct_arr),
851+
None,
852+
)),
853+
],
854+
)?;
855+
856+
let list_files_cache = ListFilesCacheTable { schema, batch };
857+
Ok(Arc::new(list_files_cache))
858+
}
859+
}

datafusion-cli/src/main.rs

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
3232
use datafusion::prelude::SessionContext;
3333
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3434
use datafusion_cli::functions::{
35-
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
35+
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
3636
};
3737
use datafusion_cli::object_storage::instrumented::{
3838
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
@@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
253253
)),
254254
);
255255

256+
ctx.register_udtf(
257+
"list_files_cache",
258+
Arc::new(ListFilesCacheFunc::new(
259+
ctx.task_ctx().runtime_env().cache_manager.clone(),
260+
)),
261+
);
262+
256263
let mut print_options = PrintOptions {
257264
format: args.format,
258265
quiet: args.quiet,
@@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
431438

432439
#[cfg(test)]
433440
mod tests {
441+
use std::time::Duration;
442+
434443
use super::*;
435444
use datafusion::{
436445
common::test_util::batches_to_string,
437446
execution::cache::{
438-
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
447+
DefaultListFilesCache, cache_manager::CacheManagerConfig,
448+
cache_unit::DefaultFileStatisticsCache,
439449
},
440-
prelude::ParquetReadOptions,
450+
prelude::{ParquetReadOptions, col, lit, split_part},
441451
};
442452
use insta::assert_snapshot;
453+
use object_store::memory::InMemory;
454+
use url::Url;
443455

444456
fn assert_conversion(input: &str, expected: Result<usize, String>) {
445457
let result = extract_memory_pool_size(input);
@@ -741,4 +753,130 @@ mod tests {
741753

742754
Ok(())
743755
}
756+
757+
#[tokio::test]
758+
async fn test_list_files_cache() -> Result<(), DataFusionError> {
759+
let list_files_cache = Arc::new(DefaultListFilesCache::new(
760+
1024,
761+
Some(Duration::from_secs(1)),
762+
));
763+
764+
let rt = RuntimeEnvBuilder::new()
765+
.with_cache_manager(
766+
CacheManagerConfig::default()
767+
.with_list_files_cache(Some(list_files_cache)),
768+
)
769+
.build_arc()
770+
.unwrap();
771+
772+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
773+
774+
ctx.register_object_store(
775+
&Url::parse("mem://test_table").unwrap(),
776+
Arc::new(InMemory::new()),
777+
);
778+
779+
ctx.register_udtf(
780+
"list_files_cache",
781+
Arc::new(ListFilesCacheFunc::new(
782+
ctx.task_ctx().runtime_env().cache_manager.clone(),
783+
)),
784+
);
785+
786+
ctx.sql(
787+
"CREATE EXTERNAL TABLE src_table
788+
STORED AS PARQUET
789+
LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
790+
)
791+
.await?
792+
.collect()
793+
.await?;
794+
795+
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;
796+
797+
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;
798+
799+
ctx.sql(
800+
"CREATE EXTERNAL TABLE test_table
801+
STORED AS PARQUET
802+
LOCATION 'mem://test_table/'
803+
",
804+
)
805+
.await?
806+
.collect()
807+
.await?;
808+
809+
let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()";
810+
let df = ctx
811+
.sql(sql)
812+
.await?
813+
.unnest_columns(&["metadata_list"])?
814+
.with_column_renamed("metadata_list", "metadata")?
815+
.unnest_columns(&["metadata"])?;
816+
817+
assert_eq!(
818+
2,
819+
df.clone()
820+
.filter(col("expires_in").is_not_null())?
821+
.count()
822+
.await?
823+
);
824+
825+
let df = df
826+
.with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")?
827+
.with_column_renamed(r#""metadata.e_tag""#, "etag")?
828+
.with_column(
829+
"filename",
830+
split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
831+
)?
832+
.select_columns(&[
833+
"metadata_size_bytes",
834+
"filename",
835+
"file_size_bytes",
836+
"etag",
837+
])?
838+
.sort(vec![col("filename").sort(true, false)])?;
839+
let rbs = df.collect().await?;
840+
assert_snapshot!(batches_to_string(&rbs),@r"
841+
+---------------------+-----------+-----------------+------+
842+
| metadata_size_bytes | filename | file_size_bytes | etag |
843+
+---------------------+-----------+-----------------+------+
844+
| 212 | 0.parquet | 3645 | 0 |
845+
| 212 | 1.parquet | 3645 | 1 |
846+
+---------------------+-----------+-----------------+------+
847+
");
848+
849+
Ok(())
850+
}
851+
852+
#[tokio::test]
853+
async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> {
854+
let rt = RuntimeEnvBuilder::new()
855+
.with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None))
856+
.build_arc()
857+
.unwrap();
858+
859+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
860+
861+
ctx.register_udtf(
862+
"list_files_cache",
863+
Arc::new(ListFilesCacheFunc::new(
864+
ctx.task_ctx().runtime_env().cache_manager.clone(),
865+
)),
866+
);
867+
868+
let rbs = ctx
869+
.sql("SELECT * FROM list_files_cache()")
870+
.await?
871+
.collect()
872+
.await?;
873+
assert_snapshot!(batches_to_string(&rbs),@r"
874+
+------+---------------------+------------+---------------+
875+
| path | metadata_size_bytes | expires_in | metadata_list |
876+
+------+---------------------+------------+---------------+
877+
+------+---------------------+------------+---------------+
878+
");
879+
880+
Ok(())
881+
}
744882
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::cache::cache_unit::DefaultFilesMetadataCache;
19+
use crate::cache::list_files_cache::ListFilesEntry;
1920
use crate::cache::{CacheAccessor, DefaultListFilesCache};
2021
use datafusion_common::stats::Precision;
2122
use datafusion_common::{Result, Statistics};
@@ -93,6 +94,9 @@ pub trait ListFilesCache:
9394

9495
/// Updates the cache with a new TTL (time-to-live).
9596
fn update_cache_ttl(&self, ttl: Option<Duration>);
97+
98+
/// Retrieves the information about the entries currently cached.
99+
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
96100
}
97101

98102
/// Generic file-embedded metadata used with [`FileMetadataCache`].

0 commit comments

Comments
 (0)