diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index a45d57e8e952d..aa83fec1118ed 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -17,13 +17,18 @@ //! Functions that are query-able and searchable via the `\h` command +use datafusion_common::instant::Instant; use std::fmt; use std::fs::File; use std::str::FromStr; use std::sync::Arc; -use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::array::{ + DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray, + TimestampMillisecondArray, UInt64Array, +}; +use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use datafusion::catalog::{Session, TableFunctionImpl}; @@ -697,3 +702,158 @@ impl TableFunctionImpl for StatisticsCacheFunc { Ok(Arc::new(statistics_cache)) } } + +#[derive(Debug)] +struct ListFilesCacheTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for ListFilesCacheTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?) + } +} + +#[derive(Debug)] +pub struct ListFilesCacheFunc { + cache_manager: Arc, +} + +impl ListFilesCacheFunc { + pub fn new(cache_manager: Arc) -> Self { + Self { cache_manager } + } +} + +impl TableFunctionImpl for ListFilesCacheFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + if !exprs.is_empty() { + return plan_err!("list_files_cache should have no arguments"); + } + + let nested_fields = Fields::from(vec![ + Field::new("file_path", DataType::Utf8, false), + Field::new( + "file_modified", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("file_size_bytes", DataType::UInt64, false), + Field::new("e_tag", DataType::Utf8, true), + Field::new("version", DataType::Utf8, true), + ]); + + let metadata_field = + Field::new("metadata", DataType::Struct(nested_fields.clone()), true); + + let schema = Arc::new(Schema::new(vec![ + Field::new("path", DataType::Utf8, false), + Field::new("metadata_size_bytes", DataType::UInt64, false), + // expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type. + Field::new( + "expires_in", + DataType::Duration(TimeUnit::Millisecond), + true, + ), + Field::new( + "metadata_list", + DataType::List(Arc::new(metadata_field.clone())), + true, + ), + ])); + + let mut path_arr = vec![]; + let mut metadata_size_bytes_arr = vec![]; + let mut expires_arr = vec![]; + + let mut file_path_arr = vec![]; + let mut file_modified_arr = vec![]; + let mut file_size_bytes_arr = vec![]; + let mut etag_arr = vec![]; + let mut version_arr = vec![]; + let mut offsets: Vec = vec![0]; + + if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() { + let now = Instant::now(); + let mut current_offset: i32 = 0; + + for (path, entry) in list_files_cache.list_entries() { + path_arr.push(path.to_string()); + metadata_size_bytes_arr.push(entry.size_bytes as u64); + // calculates time left before entry expires + expires_arr.push( + entry + .expires + .map(|t| t.duration_since(now).as_millis() as i64), + ); + + for meta in entry.metas.iter() { + file_path_arr.push(meta.location.to_string()); + file_modified_arr.push(meta.last_modified.timestamp_millis()); + file_size_bytes_arr.push(meta.size); + etag_arr.push(meta.e_tag.clone()); + version_arr.push(meta.version.clone()); + } + current_offset += entry.metas.len() as i32; + offsets.push(current_offset); + } + } + + let struct_arr = StructArray::new( + nested_fields, + vec![ + Arc::new(StringArray::from(file_path_arr)), + Arc::new(TimestampMillisecondArray::from(file_modified_arr)), + Arc::new(UInt64Array::from(file_size_bytes_arr)), + Arc::new(StringArray::from(etag_arr)), + Arc::new(StringArray::from(version_arr)), + ], + None, + ); + + let offsets_buffer: OffsetBuffer = + OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets))); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(path_arr)), + Arc::new(UInt64Array::from(metadata_size_bytes_arr)), + Arc::new(DurationMillisecondArray::from(expires_arr)), + Arc::new(GenericListArray::new( + Arc::new(metadata_field), + offsets_buffer, + Arc::new(struct_arr), + None, + )), + ], + )?; + + let list_files_cache = ListFilesCacheTable { schema, batch }; + Ok(Arc::new(list_files_cache)) + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 8f69ae477904c..46d88152fac12 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{ - MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, + ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, }; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, @@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> { )), ); + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, @@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use datafusion::{ common::test_util::batches_to_string, execution::cache::{ - cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache, + DefaultListFilesCache, cache_manager::CacheManagerConfig, + cache_unit::DefaultFileStatisticsCache, }, - prelude::ParquetReadOptions, + prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; + use object_store::memory::InMemory; + use url::Url; fn assert_conversion(input: &str, expected: Result) { let result = extract_memory_pool_size(input); @@ -741,4 +753,130 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_list_files_cache() -> Result<(), DataFusionError> { + let list_files_cache = Arc::new(DefaultListFilesCache::new( + 1024, + Some(Duration::from_secs(1)), + )); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default() + .with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + ctx.register_object_store( + &Url::parse("mem://test_table").unwrap(), + Arc::new(InMemory::new()), + ); + + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + ctx.sql( + "CREATE EXTERNAL TABLE src_table + STORED AS PARQUET + LOCATION '../parquet-testing/data/alltypes_plain.parquet'", + ) + .await? + .collect() + .await?; + + ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?; + + ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?; + + ctx.sql( + "CREATE EXTERNAL TABLE test_table + STORED AS PARQUET + LOCATION 'mem://test_table/' + ", + ) + .await? + .collect() + .await?; + + let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()"; + let df = ctx + .sql(sql) + .await? + .unnest_columns(&["metadata_list"])? + .with_column_renamed("metadata_list", "metadata")? + .unnest_columns(&["metadata"])?; + + assert_eq!( + 2, + df.clone() + .filter(col("expires_in").is_not_null())? + .count() + .await? + ); + + let df = df + .with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")? + .with_column_renamed(r#""metadata.e_tag""#, "etag")? + .with_column( + "filename", + split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)), + )? + .select_columns(&[ + "metadata_size_bytes", + "filename", + "file_size_bytes", + "etag", + ])? + .sort(vec![col("filename").sort(true, false)])?; + let rbs = df.collect().await?; + assert_snapshot!(batches_to_string(&rbs),@r" + +---------------------+-----------+-----------------+------+ + | metadata_size_bytes | filename | file_size_bytes | etag | + +---------------------+-----------+-----------------+------+ + | 212 | 0.parquet | 3645 | 0 | + | 212 | 1.parquet | 3645 | 1 | + +---------------------+-----------+-----------------+------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> { + let rt = RuntimeEnvBuilder::new() + .with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None)) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + let rbs = ctx + .sql("SELECT * FROM list_files_cache()") + .await? + .collect() + .await?; + assert_snapshot!(batches_to_string(&rbs),@r" + +------+---------------------+------------+---------------+ + | path | metadata_size_bytes | expires_in | metadata_list | + +------+---------------------+------------+---------------+ + +------+---------------------+------------+---------------+ + "); + + Ok(()) + } } diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c76a68c651eb0..31a2323524dd4 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,6 +16,7 @@ // under the License. use crate::cache::cache_unit::DefaultFilesMetadataCache; +use crate::cache::list_files_cache::ListFilesEntry; use crate::cache::{CacheAccessor, DefaultListFilesCache}; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; @@ -93,6 +94,9 @@ pub trait ListFilesCache: /// Updates the cache with a new TTL (time-to-live). fn update_cache_ttl(&self, ttl: Option); + + /// Retrieves the information about the entries currently cached. + fn list_entries(&self) -> HashMap; } /// Generic file-embedded metadata used with [`FileMetadataCache`]. diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 661bc47b5468a..c4a92c49478d7 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -17,6 +17,7 @@ use std::mem::size_of; use std::{ + collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; @@ -103,10 +104,11 @@ impl DefaultListFilesCache { } } -struct ListFilesEntry { - metas: Arc>, - size_bytes: usize, - expires: Option, +#[derive(Clone, PartialEq, Debug)] +pub struct ListFilesEntry { + pub metas: Arc>, + pub size_bytes: usize, + pub expires: Option, } impl ListFilesEntry { @@ -347,6 +349,15 @@ impl ListFilesCache for DefaultListFilesCache { state.ttl = ttl; state.evict_entries(); } + + fn list_entries(&self) -> HashMap { + let state = self.state.lock().unwrap(); + let mut entries = HashMap::::new(); + for (path, entry) in state.lru_queue.list_entries() { + entries.insert(path.clone(), entry.clone()); + } + entries + } } impl CacheAccessor>> for DefaultListFilesCache { @@ -431,7 +442,6 @@ impl CacheAccessor>> for DefaultListFilesCache { mod tests { use super::*; use chrono::DateTime; - use std::thread; struct MockTimeProvider { base: Instant, @@ -525,12 +535,35 @@ mod tests { assert_eq!(cache.len(), 0); // Put multiple entries - let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, _) = create_test_list_files_entry("path2", 3, 50); - cache.put(&path1, value1); - cache.put(&path2, value2); + let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); + cache.put(&path1, Arc::clone(&value1)); + cache.put(&path2, Arc::clone(&value2)); assert_eq!(cache.len(), 2); + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + path1.clone(), + ListFilesEntry { + metas: value1, + size_bytes: size1, + expires: None, + } + ), + ( + path2.clone(), + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: None, + } + ) + ]) + ); + // Clear all entries cache.clear(); assert_eq!(cache.len(), 0); @@ -673,14 +706,14 @@ mod tests { #[test] fn test_entry_update_with_size_change() { let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100); let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100); let cache = DefaultListFilesCache::new(size * 3, None); // Add three entries cache.put(&path1, value1); - cache.put(&path2, value2); + cache.put(&path2, Arc::clone(&value2)); cache.put(&path3, value3_v1); assert_eq!(cache.len(), 3); @@ -694,35 +727,77 @@ mod tests { assert!(cache.contains_key(&path3)); // Update path3 with larger size that requires evicting path1 (LRU) - let (_, value3_v3, _) = create_test_list_files_entry("path3", 1, 200); - cache.put(&path3, value3_v3); + let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); + cache.put(&path3, Arc::clone(&value3_v3)); assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); // Evicted (was LRU) - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(!cache.contains_key(&path1)); + + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + path2, + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: None, + } + ), + ( + path3, + ListFilesEntry { + metas: value3_v3, + size_bytes: size3_v3, + expires: None, + } + ) + ]) + ); } #[test] fn test_cache_with_ttl() { let ttl = Duration::from_millis(100); - let cache = DefaultListFilesCache::new(10000, Some(ttl)); - let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, _) = create_test_list_files_entry("path2", 2, 50); + let mock_time = Arc::new(MockTimeProvider::new()); + let cache = DefaultListFilesCache::new(10000, Some(ttl)) + .with_time_provider(Arc::clone(&mock_time) as Arc); - cache.put(&path1, value1); - cache.put(&path2, value2); + let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); + + cache.put(&path1, Arc::clone(&value1)); + cache.put(&path2, Arc::clone(&value2)); // Entries should be accessible immediately assert!(cache.get(&path1).is_some()); assert!(cache.get(&path2).is_some()); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert_eq!(cache.len(), 2); - + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + path1.clone(), + ListFilesEntry { + metas: value1, + size_bytes: size1, + expires: mock_time.now().checked_add(ttl), + } + ), + ( + path2.clone(), + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: mock_time.now().checked_add(ttl), + } + ) + ]) + ); // Wait for TTL to expire - thread::sleep(Duration::from_millis(150)); + mock_time.inc(Duration::from_millis(150)); // Entries should now return None and be removed when observed through get or contains_key assert!(cache.get(&path1).is_none()); diff --git a/docs/source/user-guide/cli/functions.md b/docs/source/user-guide/cli/functions.md index f3b0163534c41..11f61297ac8df 100644 --- a/docs/source/user-guide/cli/functions.md +++ b/docs/source/user-guide/cli/functions.md @@ -170,5 +170,43 @@ The columns of the returned table are: | table_size_bytes | Utf8 | Size of the table, in bytes | | statistics_size_bytes | UInt64 | Size of the cached statistics in memory | +## `list_files_cache` + +The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. + +You can inspect the cache by querying the `list_files_cache` function. For example, + +```sql +> select split_part(path, '/', -1) as folder, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache(); ++----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +| folder | metadata_size_bytes | expires_in | file_size_bytes | e_tag | ++----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1233969 | 7041136-643a7bfeeec9b-12d431 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1234756 | 7041137-643a7bfeef2df-12d744 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232554 | 7041139-643a7bfeef86a-12ceaa | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1238676 | 704113a-643a7bfeef914-12e694 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232186 | 704113b-643a7bfeefb22-12cd3a | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1237506 | 7041138-643a7bfeef775-12e202 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228756 | 7041134-643a7bfeec2d8-12bfd4 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228509 | 7041135-643a7bfeed599-12bedd | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20124715 | 704114a-643a7c00bb560-133142b | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20131024 | 7041149-643a7c00b90b7-1332cd0 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20179217 | 704114b-643a7c00bb93e-133e911 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20296819 | 704114f-643a7c00ccefd-135b473 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20110730 | 7041148-643a7c00b9832-132dd8a | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20128346 | 704114c-643a7c00bc00a-133225a | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20130133 | 7041147-643a7c00b3901-1332955 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20139830 | 7041146-643a7c00abbe8-1334f36 | ++----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +``` + +The columns of the returned table are: +| column_name | data_type | Description | +| ------------------- | ------------ | ----------------------------------------------------------------------------------------- | +| path | Utf8 | File path relative to the object store / filesystem root | +| metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not its thrift encoded form) | +| expires_in | Duration(ms) | Last modified time of the file | +| metadata_list | List(Struct) | List of metadatas, one for each file under the path. | + [`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag