@@ -112,7 +112,11 @@ impl TableFunctionImpl for FileDependenciesUdtf {
112112 ) ) ?;
113113
114114 Ok ( Arc :: new ( ViewTable :: try_new (
115- mv_dependencies_plan ( mv, self . row_metadata_registry . clone ( ) , & self . config_options ) ?,
115+ mv_dependencies_plan (
116+ mv,
117+ self . row_metadata_registry . as_ref ( ) ,
118+ & self . config_options ,
119+ ) ?,
116120 None ,
117121 ) ?) )
118122 }
@@ -230,7 +234,7 @@ fn get_table_name(args: &[Expr]) -> Result<&String> {
230234/// for this materialized view, together with the dependencies for each target.
231235pub fn mv_dependencies_plan (
232236 materialized_view : & dyn Materialized ,
233- row_metadata_registry : Arc < RowMetadataRegistry > ,
237+ row_metadata_registry : & RowMetadataRegistry ,
234238 config_options : & ConfigOptions ,
235239) -> Result < LogicalPlan > {
236240 use datafusion_expr:: logical_plan:: * ;
@@ -249,12 +253,19 @@ pub fn mv_dependencies_plan(
249253 // First expand all wildcards
250254 let plan = ExpandWildcardRule { } . analyze ( plan, config_options) ?;
251255
252- // Prune non-partition columns from all table scans
253- let pruned_plan = pushdown_projection_inexact ( plan, & partition_col_indices) ?;
256+ let pruned_plan_with_source_files = if partition_cols. is_empty ( ) {
257+ get_source_files_all_partitions (
258+ materialized_view,
259+ & config_options. catalog ,
260+ row_metadata_registry,
261+ )
262+ } else {
263+ // Prune non-partition columns from all table scans
264+ let pruned_plan = pushdown_projection_inexact ( plan, & partition_col_indices) ?;
254265
255- // Now bubble up file metadata to the top of the plan
256- let pruned_plan_with_source_files =
257- push_up_file_metadata ( pruned_plan , & config_options . catalog , row_metadata_registry ) ?;
266+ // Now bubble up file metadata to the top of the plan
267+ push_up_file_metadata ( pruned_plan , & config_options . catalog , row_metadata_registry )
268+ } ?;
258269
259270 // We now have data in the following form:
260271 // (partition_col0, partition_col1, ..., __meta)
@@ -722,13 +733,13 @@ fn project_dfschema(schema: &DFSchema, indices: &HashSet<usize>) -> Result<DFSch
722733fn push_up_file_metadata (
723734 plan : LogicalPlan ,
724735 catalog_options : & CatalogOptions ,
725- row_metadata_registry : Arc < RowMetadataRegistry > ,
736+ row_metadata_registry : & RowMetadataRegistry ,
726737) -> Result < LogicalPlan > {
727738 let alias_generator = AliasGenerator :: new ( ) ;
728739 plan. transform_up ( |plan| {
729740 match plan {
730741 LogicalPlan :: TableScan ( scan) => {
731- scan_columns_from_row_metadata ( scan, catalog_options, row_metadata_registry. clone ( ) )
742+ scan_columns_from_row_metadata ( scan, catalog_options, row_metadata_registry)
732743 }
733744 plan => project_row_metadata_from_input ( plan, & alias_generator) ,
734745 }
@@ -800,7 +811,7 @@ fn project_row_metadata_from_input(
800811fn scan_columns_from_row_metadata (
801812 scan : TableScan ,
802813 catalog_options : & CatalogOptions ,
803- row_metadata_registry : Arc < RowMetadataRegistry > ,
814+ row_metadata_registry : & RowMetadataRegistry ,
804815) -> Result < LogicalPlan > {
805816 let table_ref = scan. table_name . clone ( ) . resolve (
806817 & catalog_options. default_catalog ,
@@ -832,6 +843,75 @@ fn scan_columns_from_row_metadata(
832843 . build ( )
833844}
834845
846+ /// Assemble sources irrespective of partitions
847+ /// This is more efficient when the materialized view has no partitions,
848+ /// but less intelligent -- it may return additional dependencies not present in the
849+ /// usual algorithm.
850+ //
851+ // TODO: see if we can optimize the normal logic for no partitions.
852+ // It seems that joins get transformed into cross joins, which can become extremely inefficient.
853+ // Hence we had to implement this alternate, simpler but less precise algorithm.
854+ // Notably, it may include more false positives.
855+ fn get_source_files_all_partitions (
856+ materialized_view : & dyn Materialized ,
857+ catalog_options : & CatalogOptions ,
858+ row_metadata_registry : & RowMetadataRegistry ,
859+ ) -> Result < LogicalPlan > {
860+ use datafusion_common:: tree_node:: TreeNodeRecursion ;
861+
862+ let mut tables = std:: collections:: HashMap :: < TableReference , _ > :: new ( ) ;
863+
864+ materialized_view
865+ . query ( )
866+ . apply ( |plan| {
867+ if let LogicalPlan :: TableScan ( scan) = plan {
868+ tables. insert ( scan. table_name . clone ( ) , Arc :: clone ( & scan. source ) ) ;
869+ }
870+
871+ Ok ( TreeNodeRecursion :: Continue )
872+ } )
873+ . unwrap ( ) ;
874+
875+ tables
876+ . into_iter ( )
877+ . try_fold (
878+ None :: < LogicalPlanBuilder > ,
879+ |maybe_plan, ( table_ref, source) | {
880+ let resolved_ref = table_ref. clone ( ) . resolve (
881+ & catalog_options. default_catalog ,
882+ & catalog_options. default_schema ,
883+ ) ;
884+
885+ let row_metadata = row_metadata_registry. get_source ( & resolved_ref) ?;
886+ let row_metadata_scan = row_metadata
887+ . row_metadata (
888+ resolved_ref,
889+ & TableScan {
890+ table_name : table_ref. clone ( ) ,
891+ source,
892+ projection : Some ( vec ! [ ] ) , // no columns relevant
893+ projected_schema : Arc :: new ( DFSchema :: empty ( ) ) ,
894+ filters : vec ! [ ] ,
895+ fetch : None ,
896+ } ,
897+ ) ?
898+ . build ( ) ?;
899+
900+ if let Some ( previous) = maybe_plan {
901+ previous. union ( row_metadata_scan)
902+ } else {
903+ Ok ( LogicalPlanBuilder :: from ( row_metadata_scan) )
904+ }
905+ . map ( Some )
906+ } ,
907+ ) ?
908+ . ok_or_else ( || DataFusionError :: Plan ( "materialized view has no source tables" . into ( ) ) ) ?
909+ // [`RowMetadataSource`] returns a Struct,
910+ // but the MV algorithm expects a list of structs at each node in the plan.
911+ . project ( vec ! [ make_array( vec![ col( META_COLUMN ) ] ) . alias( META_COLUMN ) ] ) ?
912+ . build ( )
913+ }
914+
835915#[ cfg( test) ]
836916mod test {
837917 use std:: { any:: Any , collections:: HashSet , sync:: Arc } ;
0 commit comments