Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 105 additions & 45 deletions src/materialized/file_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@ use crate::materialized::cast_to_listing_table;
pub struct FileMetadata {
table_schema: SchemaRef,
catalog_list: Arc<dyn CatalogProviderList>,
metadata_provider: Arc<dyn FileMetadataProvider>,
}

impl FileMetadata {
/// Construct a new [`FileMetadata`] table provider that lists files for all
/// tables in the provided catalog list.
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
pub fn new(
catalog_list: Arc<dyn CatalogProviderList>,
metadata_provider: Arc<dyn FileMetadataProvider>,
) -> Self {
Self {
table_schema: Arc::new(Schema::new(vec![
Field::new("table_catalog", DataType::Utf8, false),
Expand All @@ -73,6 +77,7 @@ impl FileMetadata {
Field::new("size", DataType::UInt64, false),
])),
catalog_list,
metadata_provider,
}
}
}
Expand Down Expand Up @@ -114,6 +119,7 @@ impl TableProvider for FileMetadata {
filters,
limit,
self.catalog_list.clone(),
self.metadata_provider.clone(),
)?;

Ok(Arc::new(exec))
Expand All @@ -136,6 +142,7 @@ pub struct FileMetadataExec {
limit: Option<usize>,
metrics: ExecutionPlanMetricsSet,
catalog_list: Arc<dyn CatalogProviderList>,
metadata_provider: Arc<dyn FileMetadataProvider>,
}

impl FileMetadataExec {
Expand All @@ -145,6 +152,7 @@ impl FileMetadataExec {
filters: Vec<Arc<dyn PhysicalExpr>>,
limit: Option<usize>,
catalog_list: Arc<dyn CatalogProviderList>,
metadata_provider: Arc<dyn FileMetadataProvider>,
) -> Result<Self> {
let projected_schema = match projection.as_ref() {
Some(projection) => Arc::new(table_schema.project(projection)?),
Expand All @@ -167,6 +175,7 @@ impl FileMetadataExec {
limit,
metrics: ExecutionPlanMetricsSet::new(),
catalog_list,
metadata_provider,
};

Ok(exec)
Expand Down Expand Up @@ -319,6 +328,7 @@ impl FileMetadataExec {

let table_schema = self.table_schema.clone();
let catalog_list = self.catalog_list.clone();
let metadata_provider = self.metadata_provider.clone();

let record_batch = async move {
// If we cannot determine the catalog, build from the entire catalog list.
Expand All @@ -328,6 +338,7 @@ impl FileMetadataExec {
debug!("No catalog filter exists, returning entire catalog list.");
return FileMetadataBuilder::build_from_catalog_list(
catalog_list,
metadata_provider,
table_schema,
context,
)
Expand All @@ -352,6 +363,7 @@ impl FileMetadataExec {
return FileMetadataBuilder::build_from_catalog(
&catalog_name,
catalog_provider,
metadata_provider,
table_schema,
context,
)
Expand Down Expand Up @@ -379,6 +391,7 @@ impl FileMetadataExec {
&catalog_name,
&schema_name,
schema_provider,
metadata_provider,
table_schema,
context,
)
Expand All @@ -402,6 +415,7 @@ impl FileMetadataExec {
&schema_name,
&table_name,
table_provider,
metadata_provider,
table_schema,
context,
)
Expand Down Expand Up @@ -448,6 +462,7 @@ impl DisplayAs for FileMetadataExec {

struct FileMetadataBuilder {
schema: SchemaRef,
metadata_provider: Arc<dyn FileMetadataProvider>,
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
Expand All @@ -457,9 +472,10 @@ struct FileMetadataBuilder {
}

impl FileMetadataBuilder {
fn new(schema: SchemaRef) -> Self {
fn new(schema: SchemaRef, metadata_provider: Arc<dyn FileMetadataProvider>) -> Self {
Self {
schema,
metadata_provider,
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
Expand All @@ -471,6 +487,7 @@ impl FileMetadataBuilder {

async fn build_from_catalog_list(
catalog_list: Arc<dyn CatalogProviderList>,
metadata_provider: Arc<dyn FileMetadataProvider>,
schema: SchemaRef,
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
Expand All @@ -481,11 +498,19 @@ impl FileMetadataBuilder {
Some(catalog_provider) => catalog_provider,
None => continue,
};
let metadata_provider = metadata_provider.clone();
let schema = schema.clone();
let context = context.clone();

tasks.push(async move {
Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await
Self::build_from_catalog(
&catalog_name,
catalog_provider,
metadata_provider,
schema,
context,
)
.await
});
}

Expand All @@ -504,6 +529,7 @@ impl FileMetadataBuilder {
async fn build_from_catalog(
catalog_name: &str,
catalog_provider: Arc<dyn CatalogProvider>,
metadata_provider: Arc<dyn FileMetadataProvider>,
schema: SchemaRef,
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
Expand All @@ -514,6 +540,7 @@ impl FileMetadataBuilder {
Some(schema_provider) => schema_provider,
None => continue,
};
let metadata_provider = metadata_provider.clone();
let schema = schema.clone();
let context = context.clone();

Expand All @@ -522,6 +549,7 @@ impl FileMetadataBuilder {
catalog_name,
&schema_name,
schema_provider,
metadata_provider,
schema,
context,
)
Expand All @@ -545,6 +573,7 @@ impl FileMetadataBuilder {
catalog_name: &str,
schema_name: &str,
schema_provider: Arc<dyn SchemaProvider>,
metadata_provider: Arc<dyn FileMetadataProvider>,
schema: SchemaRef,
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
Expand All @@ -555,6 +584,7 @@ impl FileMetadataBuilder {
Some(table_provider) => table_provider,
None => continue,
};
let metadata_provider = metadata_provider.clone();
let schema = schema.clone();
let context = context.clone();

Expand All @@ -564,6 +594,7 @@ impl FileMetadataBuilder {
schema_name,
&table_name,
table_provider,
metadata_provider,
schema,
context,
)
Expand All @@ -587,10 +618,11 @@ impl FileMetadataBuilder {
schema_name: &str,
table_name: &str,
table_provider: Arc<dyn TableProvider>,
metadata_provider: Arc<dyn FileMetadataProvider>,
schema: SchemaRef,
context: Arc<TaskContext>,
) -> Result<Option<RecordBatch>> {
let mut builder = Self::new(schema.clone());
let mut builder = Self::new(schema.clone(), metadata_provider.clone());

let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) {
None => return Ok(None),
Expand Down Expand Up @@ -628,17 +660,19 @@ impl FileMetadataBuilder {
let store_url = table_path.object_store();
let store = context.runtime_env().object_store(table_path)?;

let mut file_stream = list_all_files(
store.as_ref(),
table_path,
file_ext,
context
.session_config()
.options()
.execution
.listing_table_ignore_subdirectory,
)
.await;
let mut file_stream = self
.metadata_provider
.list_all_files(
store.clone(),
table_path.clone(),
file_ext.to_string(),
context
.session_config()
.options()
.execution
.listing_table_ignore_subdirectory,
)
.await;

while let Some(file_meta) = file_stream.try_next().await? {
self.append(
Expand Down Expand Up @@ -687,38 +721,61 @@ impl FileMetadataBuilder {
}
}

// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate
// Modified to handle empty tables
async fn list_all_files<'a>(
store: &'a dyn ObjectStore,
url: &'a ListingTableUrl,
file_extension: &'a str,
ignore_subdirectory: bool,
) -> BoxStream<'a, Result<ObjectMeta>> {
// Check if the directory exists yet
if let Err(object_store::Error::NotFound { path, .. }) =
store.list_with_delimiter(Some(url.prefix())).await
{
debug!(
/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider.
#[async_trait]
pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
/// List all files in the store for the given `url` prefix.
async fn list_all_files(
&self,
store: Arc<dyn ObjectStore>,
url: ListingTableUrl,
file_extension: String,
ignore_subdirectory: bool,
) -> BoxStream<'static, Result<ObjectMeta>>;
}

/// Default implementation of the [`FileMetadataProvider`].
#[derive(Debug)]
pub struct DefaultFileMetadataProvider;

#[async_trait]
impl FileMetadataProvider for DefaultFileMetadataProvider {
// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate
// Modified to handle empty tables
async fn list_all_files(
&self,
store: Arc<dyn ObjectStore>,
url: ListingTableUrl,
file_extension: String,
ignore_subdirectory: bool,
) -> BoxStream<'static, Result<ObjectMeta>> {
// Check if the directory exists yet
if let Err(object_store::Error::NotFound { path, .. }) =
store.list_with_delimiter(Some(url.prefix())).await
{
debug!(
"attempted to list empty table at {path} during file_metadata listing, returning empty list"
);
return Box::pin(stream::empty());
}
return Box::pin(stream::empty());
}

let is_dir = url.as_str().ends_with('/');
let list = match is_dir {
true => store.list(Some(url.prefix())),
false => futures::stream::once(store.head(url.prefix())).boxed(),
};
let is_dir = url.as_str().ends_with('/');
let prefix = url.prefix().clone();

let list = match is_dir {
true => store.list(Some(&prefix)),
false => futures::stream::once(async move { store.head(&prefix).await }).boxed(),
};

list.map_err(Into::into)
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = url.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.boxed()
list.map_err(Into::into)
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(&file_extension);
let glob_match = url.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.boxed()
}
}

#[cfg(test)]
Expand All @@ -739,7 +796,7 @@ mod test {
use tempfile::TempDir;
use url::Url;

use super::FileMetadata;
use super::{DefaultFileMetadataProvider, FileMetadata};

struct TestContext {
_dir: TempDir,
Expand Down Expand Up @@ -862,7 +919,10 @@ mod test {

ctx.register_table(
"file_metadata",
Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))),
Arc::new(FileMetadata::new(
Arc::clone(ctx.state().catalog_list()),
Arc::new(DefaultFileMetadataProvider),
)),
)
.context("register file metadata table")?;

Expand Down
7 changes: 5 additions & 2 deletions tests/materialized_listing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion_expr::{
};
use datafusion_materialized_views::materialized::{
dependencies::{mv_dependencies, stale_files},
file_metadata::FileMetadata,
file_metadata::{DefaultFileMetadataProvider, FileMetadata},
register_materialized,
row_metadata::RowMetadataRegistry,
ListingTableLike, Materialized,
Expand Down Expand Up @@ -146,7 +146,10 @@ async fn setup() -> Result<TestContext> {
// Now register the `mv_dependencies` and `stale_files` UDTFs
// They have `FileMetadata` and `RowMetadataRegistry` as dependencies.

let file_metadata = Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list())));
let file_metadata = Arc::new(FileMetadata::new(
Arc::clone(ctx.state().catalog_list()),
Arc::new(DefaultFileMetadataProvider),
));
let row_metadata_registry = Arc::new(RowMetadataRegistry::new(Arc::clone(&file_metadata)));

ctx.register_udtf(
Expand Down