From 41e7ed67c221219ec58d4002ed8e02fec503c463 Mon Sep 17 00:00:00 2001 From: Jared Combs <9115033+jared-m-combs@users.noreply.github.com> Date: Tue, 17 Jun 2025 01:57:39 -0400 Subject: [PATCH] Allow customization of `list_all_files` function. We need to be able to support custom file listing logic (e.g. versioning). Introduced a new trait to allow the injection of custom logic. --- src/materialized/file_metadata.rs | 150 +++++++++++++++++++--------- tests/materialized_listing_table.rs | 7 +- 2 files changed, 110 insertions(+), 47 deletions(-) diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 02fcac1..d5a5c8e 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -53,12 +53,16 @@ use crate::materialized::cast_to_listing_table; pub struct FileMetadata { table_schema: SchemaRef, catalog_list: Arc, + metadata_provider: Arc, } impl FileMetadata { /// Construct a new [`FileMetadata`] table provider that lists files for all /// tables in the provided catalog list. - pub fn new(catalog_list: Arc) -> Self { + pub fn new( + catalog_list: Arc, + metadata_provider: Arc, + ) -> Self { Self { table_schema: Arc::new(Schema::new(vec![ Field::new("table_catalog", DataType::Utf8, false), @@ -73,6 +77,7 @@ impl FileMetadata { Field::new("size", DataType::UInt64, false), ])), catalog_list, + metadata_provider, } } } @@ -114,6 +119,7 @@ impl TableProvider for FileMetadata { filters, limit, self.catalog_list.clone(), + self.metadata_provider.clone(), )?; Ok(Arc::new(exec)) @@ -136,6 +142,7 @@ pub struct FileMetadataExec { limit: Option, metrics: ExecutionPlanMetricsSet, catalog_list: Arc, + metadata_provider: Arc, } impl FileMetadataExec { @@ -145,6 +152,7 @@ impl FileMetadataExec { filters: Vec>, limit: Option, catalog_list: Arc, + metadata_provider: Arc, ) -> Result { let projected_schema = match projection.as_ref() { Some(projection) => Arc::new(table_schema.project(projection)?), @@ -167,6 +175,7 @@ impl FileMetadataExec { limit, metrics: ExecutionPlanMetricsSet::new(), catalog_list, + metadata_provider, }; Ok(exec) @@ -319,6 +328,7 @@ impl FileMetadataExec { let table_schema = self.table_schema.clone(); let catalog_list = self.catalog_list.clone(); + let metadata_provider = self.metadata_provider.clone(); let record_batch = async move { // If we cannot determine the catalog, build from the entire catalog list. @@ -328,6 +338,7 @@ impl FileMetadataExec { debug!("No catalog filter exists, returning entire catalog list."); return FileMetadataBuilder::build_from_catalog_list( catalog_list, + metadata_provider, table_schema, context, ) @@ -352,6 +363,7 @@ impl FileMetadataExec { return FileMetadataBuilder::build_from_catalog( &catalog_name, catalog_provider, + metadata_provider, table_schema, context, ) @@ -379,6 +391,7 @@ impl FileMetadataExec { &catalog_name, &schema_name, schema_provider, + metadata_provider, table_schema, context, ) @@ -402,6 +415,7 @@ impl FileMetadataExec { &schema_name, &table_name, table_provider, + metadata_provider, table_schema, context, ) @@ -448,6 +462,7 @@ impl DisplayAs for FileMetadataExec { struct FileMetadataBuilder { schema: SchemaRef, + metadata_provider: Arc, catalog_names: StringBuilder, schema_names: StringBuilder, table_names: StringBuilder, @@ -457,9 +472,10 @@ struct FileMetadataBuilder { } impl FileMetadataBuilder { - fn new(schema: SchemaRef) -> Self { + fn new(schema: SchemaRef, metadata_provider: Arc) -> Self { Self { schema, + metadata_provider, catalog_names: StringBuilder::new(), schema_names: StringBuilder::new(), table_names: StringBuilder::new(), @@ -471,6 +487,7 @@ impl FileMetadataBuilder { async fn build_from_catalog_list( catalog_list: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { @@ -481,11 +498,19 @@ impl FileMetadataBuilder { Some(catalog_provider) => catalog_provider, None => continue, }; + let metadata_provider = metadata_provider.clone(); let schema = schema.clone(); let context = context.clone(); tasks.push(async move { - Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await + Self::build_from_catalog( + &catalog_name, + catalog_provider, + metadata_provider, + schema, + context, + ) + .await }); } @@ -504,6 +529,7 @@ impl FileMetadataBuilder { async fn build_from_catalog( catalog_name: &str, catalog_provider: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { @@ -514,6 +540,7 @@ impl FileMetadataBuilder { Some(schema_provider) => schema_provider, None => continue, }; + let metadata_provider = metadata_provider.clone(); let schema = schema.clone(); let context = context.clone(); @@ -522,6 +549,7 @@ impl FileMetadataBuilder { catalog_name, &schema_name, schema_provider, + metadata_provider, schema, context, ) @@ -545,6 +573,7 @@ impl FileMetadataBuilder { catalog_name: &str, schema_name: &str, schema_provider: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { @@ -555,6 +584,7 @@ impl FileMetadataBuilder { Some(table_provider) => table_provider, None => continue, }; + let metadata_provider = metadata_provider.clone(); let schema = schema.clone(); let context = context.clone(); @@ -564,6 +594,7 @@ impl FileMetadataBuilder { schema_name, &table_name, table_provider, + metadata_provider, schema, context, ) @@ -587,10 +618,11 @@ impl FileMetadataBuilder { schema_name: &str, table_name: &str, table_provider: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { - let mut builder = Self::new(schema.clone()); + let mut builder = Self::new(schema.clone(), metadata_provider.clone()); let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) { None => return Ok(None), @@ -628,17 +660,19 @@ impl FileMetadataBuilder { let store_url = table_path.object_store(); let store = context.runtime_env().object_store(table_path)?; - let mut file_stream = list_all_files( - store.as_ref(), - table_path, - file_ext, - context - .session_config() - .options() - .execution - .listing_table_ignore_subdirectory, - ) - .await; + let mut file_stream = self + .metadata_provider + .list_all_files( + store.clone(), + table_path.clone(), + file_ext.to_string(), + context + .session_config() + .options() + .execution + .listing_table_ignore_subdirectory, + ) + .await; while let Some(file_meta) = file_stream.try_next().await? { self.append( @@ -687,38 +721,61 @@ impl FileMetadataBuilder { } } -// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate -// Modified to handle empty tables -async fn list_all_files<'a>( - store: &'a dyn ObjectStore, - url: &'a ListingTableUrl, - file_extension: &'a str, - ignore_subdirectory: bool, -) -> BoxStream<'a, Result> { - // Check if the directory exists yet - if let Err(object_store::Error::NotFound { path, .. }) = - store.list_with_delimiter(Some(url.prefix())).await - { - debug!( +/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider. +#[async_trait] +pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync { + /// List all files in the store for the given `url` prefix. + async fn list_all_files( + &self, + store: Arc, + url: ListingTableUrl, + file_extension: String, + ignore_subdirectory: bool, + ) -> BoxStream<'static, Result>; +} + +/// Default implementation of the [`FileMetadataProvider`]. +#[derive(Debug)] +pub struct DefaultFileMetadataProvider; + +#[async_trait] +impl FileMetadataProvider for DefaultFileMetadataProvider { + // Mostly copied from ListingTableUrl::list_all_files, which is private to that crate + // Modified to handle empty tables + async fn list_all_files( + &self, + store: Arc, + url: ListingTableUrl, + file_extension: String, + ignore_subdirectory: bool, + ) -> BoxStream<'static, Result> { + // Check if the directory exists yet + if let Err(object_store::Error::NotFound { path, .. }) = + store.list_with_delimiter(Some(url.prefix())).await + { + debug!( "attempted to list empty table at {path} during file_metadata listing, returning empty list" ); - return Box::pin(stream::empty()); - } + return Box::pin(stream::empty()); + } - let is_dir = url.as_str().ends_with('/'); - let list = match is_dir { - true => store.list(Some(url.prefix())), - false => futures::stream::once(store.head(url.prefix())).boxed(), - }; + let is_dir = url.as_str().ends_with('/'); + let prefix = url.prefix().clone(); + + let list = match is_dir { + true => store.list(Some(&prefix)), + false => futures::stream::once(async move { store.head(&prefix).await }).boxed(), + }; - list.map_err(Into::into) - .try_filter(move |meta| { - let path = &meta.location; - let extension_match = path.as_ref().ends_with(file_extension); - let glob_match = url.contains(path, ignore_subdirectory); - futures::future::ready(extension_match && glob_match) - }) - .boxed() + list.map_err(Into::into) + .try_filter(move |meta| { + let path = &meta.location; + let extension_match = path.as_ref().ends_with(&file_extension); + let glob_match = url.contains(path, ignore_subdirectory); + futures::future::ready(extension_match && glob_match) + }) + .boxed() + } } #[cfg(test)] @@ -739,7 +796,7 @@ mod test { use tempfile::TempDir; use url::Url; - use super::FileMetadata; + use super::{DefaultFileMetadataProvider, FileMetadata}; struct TestContext { _dir: TempDir, @@ -862,7 +919,10 @@ mod test { ctx.register_table( "file_metadata", - Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))), + Arc::new(FileMetadata::new( + Arc::clone(ctx.state().catalog_list()), + Arc::new(DefaultFileMetadataProvider), + )), ) .context("register file metadata table")?; diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 6ab2bf1..5ad9d25 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -39,7 +39,7 @@ use datafusion_expr::{ }; use datafusion_materialized_views::materialized::{ dependencies::{mv_dependencies, stale_files}, - file_metadata::FileMetadata, + file_metadata::{DefaultFileMetadataProvider, FileMetadata}, register_materialized, row_metadata::RowMetadataRegistry, ListingTableLike, Materialized, @@ -146,7 +146,10 @@ async fn setup() -> Result { // Now register the `mv_dependencies` and `stale_files` UDTFs // They have `FileMetadata` and `RowMetadataRegistry` as dependencies. - let file_metadata = Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))); + let file_metadata = Arc::new(FileMetadata::new( + Arc::clone(ctx.state().catalog_list()), + Arc::new(DefaultFileMetadataProvider), + )); let row_metadata_registry = Arc::new(RowMetadataRegistry::new(Arc::clone(&file_metadata))); ctx.register_udtf(