Skip to content

Commit b733a12

Browse files
Allow customization of list_all_files function. (#69)
We need to be able to support custom file listing logic (e.g. versioning). Introduced a new trait to allow the injection of custom logic. Co-authored-by: Matthew Turner <[email protected]>
1 parent 819843c commit b733a12

File tree

2 files changed

+110
-47
lines changed

2 files changed

+110
-47
lines changed

src/materialized/file_metadata.rs

Lines changed: 105 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,16 @@ use crate::materialized::cast_to_listing_table;
5353
pub struct FileMetadata {
5454
table_schema: SchemaRef,
5555
catalog_list: Arc<dyn CatalogProviderList>,
56+
metadata_provider: Arc<dyn FileMetadataProvider>,
5657
}
5758

5859
impl FileMetadata {
5960
/// Construct a new [`FileMetadata`] table provider that lists files for all
6061
/// tables in the provided catalog list.
61-
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
62+
pub fn new(
63+
catalog_list: Arc<dyn CatalogProviderList>,
64+
metadata_provider: Arc<dyn FileMetadataProvider>,
65+
) -> Self {
6266
Self {
6367
table_schema: Arc::new(Schema::new(vec![
6468
Field::new("table_catalog", DataType::Utf8, false),
@@ -73,6 +77,7 @@ impl FileMetadata {
7377
Field::new("size", DataType::UInt64, false),
7478
])),
7579
catalog_list,
80+
metadata_provider,
7681
}
7782
}
7883
}
@@ -114,6 +119,7 @@ impl TableProvider for FileMetadata {
114119
filters,
115120
limit,
116121
self.catalog_list.clone(),
122+
self.metadata_provider.clone(),
117123
)?;
118124

119125
Ok(Arc::new(exec))
@@ -136,6 +142,7 @@ pub struct FileMetadataExec {
136142
limit: Option<usize>,
137143
metrics: ExecutionPlanMetricsSet,
138144
catalog_list: Arc<dyn CatalogProviderList>,
145+
metadata_provider: Arc<dyn FileMetadataProvider>,
139146
}
140147

141148
impl FileMetadataExec {
@@ -145,6 +152,7 @@ impl FileMetadataExec {
145152
filters: Vec<Arc<dyn PhysicalExpr>>,
146153
limit: Option<usize>,
147154
catalog_list: Arc<dyn CatalogProviderList>,
155+
metadata_provider: Arc<dyn FileMetadataProvider>,
148156
) -> Result<Self> {
149157
let projected_schema = match projection.as_ref() {
150158
Some(projection) => Arc::new(table_schema.project(projection)?),
@@ -167,6 +175,7 @@ impl FileMetadataExec {
167175
limit,
168176
metrics: ExecutionPlanMetricsSet::new(),
169177
catalog_list,
178+
metadata_provider,
170179
};
171180

172181
Ok(exec)
@@ -319,6 +328,7 @@ impl FileMetadataExec {
319328

320329
let table_schema = self.table_schema.clone();
321330
let catalog_list = self.catalog_list.clone();
331+
let metadata_provider = self.metadata_provider.clone();
322332

323333
let record_batch = async move {
324334
// If we cannot determine the catalog, build from the entire catalog list.
@@ -328,6 +338,7 @@ impl FileMetadataExec {
328338
debug!("No catalog filter exists, returning entire catalog list.");
329339
return FileMetadataBuilder::build_from_catalog_list(
330340
catalog_list,
341+
metadata_provider,
331342
table_schema,
332343
context,
333344
)
@@ -352,6 +363,7 @@ impl FileMetadataExec {
352363
return FileMetadataBuilder::build_from_catalog(
353364
&catalog_name,
354365
catalog_provider,
366+
metadata_provider,
355367
table_schema,
356368
context,
357369
)
@@ -379,6 +391,7 @@ impl FileMetadataExec {
379391
&catalog_name,
380392
&schema_name,
381393
schema_provider,
394+
metadata_provider,
382395
table_schema,
383396
context,
384397
)
@@ -402,6 +415,7 @@ impl FileMetadataExec {
402415
&schema_name,
403416
&table_name,
404417
table_provider,
418+
metadata_provider,
405419
table_schema,
406420
context,
407421
)
@@ -448,6 +462,7 @@ impl DisplayAs for FileMetadataExec {
448462

449463
struct FileMetadataBuilder {
450464
schema: SchemaRef,
465+
metadata_provider: Arc<dyn FileMetadataProvider>,
451466
catalog_names: StringBuilder,
452467
schema_names: StringBuilder,
453468
table_names: StringBuilder,
@@ -457,9 +472,10 @@ struct FileMetadataBuilder {
457472
}
458473

459474
impl FileMetadataBuilder {
460-
fn new(schema: SchemaRef) -> Self {
475+
fn new(schema: SchemaRef, metadata_provider: Arc<dyn FileMetadataProvider>) -> Self {
461476
Self {
462477
schema,
478+
metadata_provider,
463479
catalog_names: StringBuilder::new(),
464480
schema_names: StringBuilder::new(),
465481
table_names: StringBuilder::new(),
@@ -471,6 +487,7 @@ impl FileMetadataBuilder {
471487

472488
async fn build_from_catalog_list(
473489
catalog_list: Arc<dyn CatalogProviderList>,
490+
metadata_provider: Arc<dyn FileMetadataProvider>,
474491
schema: SchemaRef,
475492
context: Arc<TaskContext>,
476493
) -> Result<Vec<RecordBatch>> {
@@ -481,11 +498,19 @@ impl FileMetadataBuilder {
481498
Some(catalog_provider) => catalog_provider,
482499
None => continue,
483500
};
501+
let metadata_provider = metadata_provider.clone();
484502
let schema = schema.clone();
485503
let context = context.clone();
486504

487505
tasks.push(async move {
488-
Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await
506+
Self::build_from_catalog(
507+
&catalog_name,
508+
catalog_provider,
509+
metadata_provider,
510+
schema,
511+
context,
512+
)
513+
.await
489514
});
490515
}
491516

@@ -504,6 +529,7 @@ impl FileMetadataBuilder {
504529
async fn build_from_catalog(
505530
catalog_name: &str,
506531
catalog_provider: Arc<dyn CatalogProvider>,
532+
metadata_provider: Arc<dyn FileMetadataProvider>,
507533
schema: SchemaRef,
508534
context: Arc<TaskContext>,
509535
) -> Result<Vec<RecordBatch>> {
@@ -514,6 +540,7 @@ impl FileMetadataBuilder {
514540
Some(schema_provider) => schema_provider,
515541
None => continue,
516542
};
543+
let metadata_provider = metadata_provider.clone();
517544
let schema = schema.clone();
518545
let context = context.clone();
519546

@@ -522,6 +549,7 @@ impl FileMetadataBuilder {
522549
catalog_name,
523550
&schema_name,
524551
schema_provider,
552+
metadata_provider,
525553
schema,
526554
context,
527555
)
@@ -545,6 +573,7 @@ impl FileMetadataBuilder {
545573
catalog_name: &str,
546574
schema_name: &str,
547575
schema_provider: Arc<dyn SchemaProvider>,
576+
metadata_provider: Arc<dyn FileMetadataProvider>,
548577
schema: SchemaRef,
549578
context: Arc<TaskContext>,
550579
) -> Result<Vec<RecordBatch>> {
@@ -555,6 +584,7 @@ impl FileMetadataBuilder {
555584
Some(table_provider) => table_provider,
556585
None => continue,
557586
};
587+
let metadata_provider = metadata_provider.clone();
558588
let schema = schema.clone();
559589
let context = context.clone();
560590

@@ -564,6 +594,7 @@ impl FileMetadataBuilder {
564594
schema_name,
565595
&table_name,
566596
table_provider,
597+
metadata_provider,
567598
schema,
568599
context,
569600
)
@@ -587,10 +618,11 @@ impl FileMetadataBuilder {
587618
schema_name: &str,
588619
table_name: &str,
589620
table_provider: Arc<dyn TableProvider>,
621+
metadata_provider: Arc<dyn FileMetadataProvider>,
590622
schema: SchemaRef,
591623
context: Arc<TaskContext>,
592624
) -> Result<Option<RecordBatch>> {
593-
let mut builder = Self::new(schema.clone());
625+
let mut builder = Self::new(schema.clone(), metadata_provider.clone());
594626

595627
let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) {
596628
None => return Ok(None),
@@ -628,17 +660,19 @@ impl FileMetadataBuilder {
628660
let store_url = table_path.object_store();
629661
let store = context.runtime_env().object_store(table_path)?;
630662

631-
let mut file_stream = list_all_files(
632-
store.as_ref(),
633-
table_path,
634-
file_ext,
635-
context
636-
.session_config()
637-
.options()
638-
.execution
639-
.listing_table_ignore_subdirectory,
640-
)
641-
.await;
663+
let mut file_stream = self
664+
.metadata_provider
665+
.list_all_files(
666+
store.clone(),
667+
table_path.clone(),
668+
file_ext.to_string(),
669+
context
670+
.session_config()
671+
.options()
672+
.execution
673+
.listing_table_ignore_subdirectory,
674+
)
675+
.await;
642676

643677
while let Some(file_meta) = file_stream.try_next().await? {
644678
self.append(
@@ -687,38 +721,61 @@ impl FileMetadataBuilder {
687721
}
688722
}
689723

690-
// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate
691-
// Modified to handle empty tables
692-
async fn list_all_files<'a>(
693-
store: &'a dyn ObjectStore,
694-
url: &'a ListingTableUrl,
695-
file_extension: &'a str,
696-
ignore_subdirectory: bool,
697-
) -> BoxStream<'a, Result<ObjectMeta>> {
698-
// Check if the directory exists yet
699-
if let Err(object_store::Error::NotFound { path, .. }) =
700-
store.list_with_delimiter(Some(url.prefix())).await
701-
{
702-
debug!(
724+
/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider.
725+
#[async_trait]
726+
pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
727+
/// List all files in the store for the given `url` prefix.
728+
async fn list_all_files(
729+
&self,
730+
store: Arc<dyn ObjectStore>,
731+
url: ListingTableUrl,
732+
file_extension: String,
733+
ignore_subdirectory: bool,
734+
) -> BoxStream<'static, Result<ObjectMeta>>;
735+
}
736+
737+
/// Default implementation of the [`FileMetadataProvider`].
738+
#[derive(Debug)]
739+
pub struct DefaultFileMetadataProvider;
740+
741+
#[async_trait]
742+
impl FileMetadataProvider for DefaultFileMetadataProvider {
743+
// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate
744+
// Modified to handle empty tables
745+
async fn list_all_files(
746+
&self,
747+
store: Arc<dyn ObjectStore>,
748+
url: ListingTableUrl,
749+
file_extension: String,
750+
ignore_subdirectory: bool,
751+
) -> BoxStream<'static, Result<ObjectMeta>> {
752+
// Check if the directory exists yet
753+
if let Err(object_store::Error::NotFound { path, .. }) =
754+
store.list_with_delimiter(Some(url.prefix())).await
755+
{
756+
debug!(
703757
"attempted to list empty table at {path} during file_metadata listing, returning empty list"
704758
);
705-
return Box::pin(stream::empty());
706-
}
759+
return Box::pin(stream::empty());
760+
}
707761

708-
let is_dir = url.as_str().ends_with('/');
709-
let list = match is_dir {
710-
true => store.list(Some(url.prefix())),
711-
false => futures::stream::once(store.head(url.prefix())).boxed(),
712-
};
762+
let is_dir = url.as_str().ends_with('/');
763+
let prefix = url.prefix().clone();
764+
765+
let list = match is_dir {
766+
true => store.list(Some(&prefix)),
767+
false => futures::stream::once(async move { store.head(&prefix).await }).boxed(),
768+
};
713769

714-
list.map_err(Into::into)
715-
.try_filter(move |meta| {
716-
let path = &meta.location;
717-
let extension_match = path.as_ref().ends_with(file_extension);
718-
let glob_match = url.contains(path, ignore_subdirectory);
719-
futures::future::ready(extension_match && glob_match)
720-
})
721-
.boxed()
770+
list.map_err(Into::into)
771+
.try_filter(move |meta| {
772+
let path = &meta.location;
773+
let extension_match = path.as_ref().ends_with(&file_extension);
774+
let glob_match = url.contains(path, ignore_subdirectory);
775+
futures::future::ready(extension_match && glob_match)
776+
})
777+
.boxed()
778+
}
722779
}
723780

724781
#[cfg(test)]
@@ -739,7 +796,7 @@ mod test {
739796
use tempfile::TempDir;
740797
use url::Url;
741798

742-
use super::FileMetadata;
799+
use super::{DefaultFileMetadataProvider, FileMetadata};
743800

744801
struct TestContext {
745802
_dir: TempDir,
@@ -862,7 +919,10 @@ mod test {
862919

863920
ctx.register_table(
864921
"file_metadata",
865-
Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))),
922+
Arc::new(FileMetadata::new(
923+
Arc::clone(ctx.state().catalog_list()),
924+
Arc::new(DefaultFileMetadataProvider),
925+
)),
866926
)
867927
.context("register file metadata table")?;
868928

tests/materialized_listing_table.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_expr::{
3939
};
4040
use datafusion_materialized_views::materialized::{
4141
dependencies::{mv_dependencies, stale_files},
42-
file_metadata::FileMetadata,
42+
file_metadata::{DefaultFileMetadataProvider, FileMetadata},
4343
register_materialized,
4444
row_metadata::RowMetadataRegistry,
4545
ListingTableLike, Materialized,
@@ -146,7 +146,10 @@ async fn setup() -> Result<TestContext> {
146146
// Now register the `mv_dependencies` and `stale_files` UDTFs
147147
// They have `FileMetadata` and `RowMetadataRegistry` as dependencies.
148148

149-
let file_metadata = Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list())));
149+
let file_metadata = Arc::new(FileMetadata::new(
150+
Arc::clone(ctx.state().catalog_list()),
151+
Arc::new(DefaultFileMetadataProvider),
152+
));
150153
let row_metadata_registry = Arc::new(RowMetadataRegistry::new(Arc::clone(&file_metadata)));
151154

152155
ctx.register_udtf(

0 commit comments

Comments
 (0)