@@ -35,7 +35,7 @@ use datafusion_expr::logical_plan::{
3535use datafusion_expr:: utils:: { conjunction, split_conjunction, split_conjunction_owned} ;
3636use datafusion_expr:: {
3737 and, build_join_schema, or, BinaryExpr , Expr , Filter , LogicalPlanBuilder , Operator ,
38- TableProviderFilterPushDown ,
38+ Projection , TableProviderFilterPushDown ,
3939} ;
4040
4141use crate :: optimizer:: ApplyOrder ;
@@ -691,58 +691,46 @@ impl OptimizerRule for PushDownFilter {
691691 insert_below ( LogicalPlan :: SubqueryAlias ( subquery_alias) , new_filter)
692692 }
693693 LogicalPlan :: Projection ( projection) => {
694- // A projection is filter-commutable if it do not contain volatile predicates or contain volatile
695- // predicates that are not used in the filter. However, we should re-writes all predicate expressions.
696- // collect projection.
697- let ( volatile_map, non_volatile_map) : ( HashMap < _ , _ > , HashMap < _ , _ > ) =
698- projection
699- . schema
700- . iter ( )
701- . zip ( projection. expr . iter ( ) )
702- . map ( |( ( qualifier, field) , expr) | {
703- // strip alias, as they should not be part of filters
704- let expr = expr. clone ( ) . unalias ( ) ;
705-
706- ( qualified_name ( qualifier, field. name ( ) ) , expr)
707- } )
708- . partition ( |( _, value) | value. is_volatile ( ) . unwrap_or ( true ) ) ;
709-
710- let mut push_predicates = vec ! [ ] ;
711- let mut keep_predicates = vec ! [ ] ;
712- for expr in split_conjunction_owned ( filter. predicate . clone ( ) ) {
713- if contain ( & expr, & volatile_map) {
714- keep_predicates. push ( expr) ;
715- } else {
716- push_predicates. push ( expr) ;
694+ let ( new_projection, keep_predicate) =
695+ rewrite_projection ( filter. predicate . clone ( ) , projection) ?;
696+ if new_projection. transformed {
697+ match keep_predicate {
698+ None => Ok ( new_projection) ,
699+ Some ( keep_predicate) => new_projection. map_data ( |child_plan| {
700+ Filter :: try_new ( keep_predicate, Arc :: new ( child_plan) )
701+ . map ( LogicalPlan :: Filter )
702+ } ) ,
717703 }
704+ } else {
705+ filter. input = Arc :: new ( new_projection. data ) ;
706+ Ok ( Transformed :: no ( LogicalPlan :: Filter ( filter) ) )
718707 }
719-
720- match conjunction ( push_predicates) {
721- Some ( expr) => {
722- // re-write all filters based on this projection
723- // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
724- let new_filter = LogicalPlan :: Filter ( Filter :: try_new (
725- replace_cols_by_name ( expr, & non_volatile_map) ?,
726- Arc :: clone ( & projection. input ) ,
727- ) ?) ;
728-
729- match conjunction ( keep_predicates) {
730- None => insert_below (
731- LogicalPlan :: Projection ( projection) ,
732- new_filter,
733- ) ,
734- Some ( keep_predicate) => insert_below (
735- LogicalPlan :: Projection ( projection) ,
736- new_filter,
737- ) ?
738- . map_data ( |child_plan| {
739- Filter :: try_new ( keep_predicate, Arc :: new ( child_plan) )
740- . map ( LogicalPlan :: Filter )
741- } ) ,
708+ }
709+ LogicalPlan :: Unnest ( mut unnest) => {
710+ // Unnest is built above Projection, so we only take Projection into consideration
711+ match unwrap_arc ( unnest. input ) {
712+ LogicalPlan :: Projection ( projection) => {
713+ let ( new_projection, keep_predicate) =
714+ rewrite_projection ( filter. predicate . clone ( ) , projection) ?;
715+ unnest. input = Arc :: new ( new_projection. data ) ;
716+
717+ if new_projection. transformed {
718+ match keep_predicate {
719+ None => Ok ( Transformed :: yes ( LogicalPlan :: Unnest ( unnest) ) ) ,
720+ Some ( keep_predicate) => Ok ( Transformed :: yes (
721+ LogicalPlan :: Filter ( Filter :: try_new (
722+ keep_predicate,
723+ Arc :: new ( LogicalPlan :: Unnest ( unnest) ) ,
724+ ) ?) ,
725+ ) ) ,
726+ }
727+ } else {
728+ filter. input = Arc :: new ( LogicalPlan :: Unnest ( unnest) ) ;
729+ Ok ( Transformed :: no ( LogicalPlan :: Filter ( filter) ) )
742730 }
743731 }
744- None => {
745- filter. input = Arc :: new ( LogicalPlan :: Projection ( projection ) ) ;
732+ child => {
733+ filter. input = Arc :: new ( child ) ;
746734 Ok ( Transformed :: no ( LogicalPlan :: Filter ( filter) ) )
747735 }
748736 }
@@ -951,6 +939,76 @@ impl OptimizerRule for PushDownFilter {
951939 }
952940}
953941
942+ /// Attempts to push `predicate` into a `FilterExec` below `projection
943+ ///
944+ /// # Returns
945+ /// (plan, remaining_predicate)
946+ ///
947+ /// `plan` is a LogicalPlan for `projection` with possibly a new FilterExec below it.
948+ /// `remaining_predicate` is any part of the predicate that could not be pushed down
949+ ///
950+ /// # Example
951+ ///
952+ /// Pushing a predicate like `foo=5 AND bar=6` with an input plan like this:
953+ ///
954+ /// ```text
955+ /// Projection(foo, c+d as bar)
956+ /// ```
957+ ///
958+ /// Might result in returning `remaining_predicate` of `bar=6` and a plan like
959+ ///
960+ /// ```text
961+ /// Projection(foo, c+d as bar)
962+ /// Filter(foo=5)
963+ /// ...
964+ /// ```
965+ fn rewrite_projection (
966+ predicate : Expr ,
967+ projection : Projection ,
968+ ) -> Result < ( Transformed < LogicalPlan > , Option < Expr > ) > {
969+ // A projection is filter-commutable if it do not contain volatile predicates or contain volatile
970+ // predicates that are not used in the filter. However, we should re-writes all predicate expressions.
971+ // collect projection.
972+ let ( volatile_map, non_volatile_map) : ( HashMap < _ , _ > , HashMap < _ , _ > ) = projection
973+ . schema
974+ . iter ( )
975+ . zip ( projection. expr . iter ( ) )
976+ . map ( |( ( qualifier, field) , expr) | {
977+ // strip alias, as they should not be part of filters
978+ let expr = expr. clone ( ) . unalias ( ) ;
979+
980+ ( qualified_name ( qualifier, field. name ( ) ) , expr)
981+ } )
982+ . partition ( |( _, value) | value. is_volatile ( ) . unwrap_or ( true ) ) ;
983+
984+ let mut push_predicates = vec ! [ ] ;
985+ let mut keep_predicates = vec ! [ ] ;
986+ for expr in split_conjunction_owned ( predicate) {
987+ if contain ( & expr, & volatile_map) {
988+ keep_predicates. push ( expr) ;
989+ } else {
990+ push_predicates. push ( expr) ;
991+ }
992+ }
993+
994+ match conjunction ( push_predicates) {
995+ Some ( expr) => {
996+ // re-write all filters based on this projection
997+ // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
998+ let new_filter = LogicalPlan :: Filter ( Filter :: try_new (
999+ replace_cols_by_name ( expr, & non_volatile_map) ?,
1000+ Arc :: clone ( & projection. input ) ,
1001+ ) ?) ;
1002+
1003+ Ok ( (
1004+ insert_below ( LogicalPlan :: Projection ( projection) , new_filter) ?,
1005+ conjunction ( keep_predicates) ,
1006+ ) )
1007+ }
1008+ None => Ok ( ( Transformed :: no ( LogicalPlan :: Projection ( projection) ) , None ) ) ,
1009+ }
1010+ }
1011+
9541012/// Creates a new LogicalPlan::Filter node.
9551013pub fn make_filter ( predicate : Expr , input : Arc < LogicalPlan > ) -> Result < LogicalPlan > {
9561014 Filter :: try_new ( predicate, input) . map ( LogicalPlan :: Filter )
0 commit comments