@@ -144,6 +144,7 @@ func (o *Optimizer) Optimize(main *dag.Main) error {
144144 seq = mergeFilters (seq )
145145 seq = mergeValuesOps (seq )
146146 inlineRecordExprSpreads (seq )
147+ seq = joinFilterPullup (seq )
147148 seq = removePassOps (seq )
148149 seq = replaceSortAndHeadOrTailWithTop (seq )
149150 o .optimizeParallels (seq )
@@ -506,6 +507,103 @@ func inlineRecordExprSpreads(v any) {
506507 })
507508}
508509
510+ func joinFilterPullup (seq dag.Seq ) dag.Seq {
511+ seq = mergeFilters (seq )
512+ for i := 0 ; i <= len (seq )- 3 ; i ++ {
513+ fork , isfork := seq [i ].(* dag.Fork )
514+ leftAlias , rightAlias , isjoin := isJoin (seq [i + 1 ])
515+ filter , isfilter := seq [i + 2 ].(* dag.Filter )
516+ if ! isfork || ! isjoin || ! isfilter {
517+ continue
518+ }
519+ if len (fork .Paths ) != 2 {
520+ panic (seq [i ])
521+ }
522+ var remaining []dag.Expr
523+ for _ , e := range breakupFilter (filter .Expr ) {
524+ if pullup , ok := pullupExpr (leftAlias , e ); ok {
525+ fork .Paths [0 ] = append (fork .Paths [0 ], dag .NewFilter (pullup ))
526+ continue
527+ }
528+ if pullup , ok := pullupExpr (rightAlias , e ); ok {
529+ fork .Paths [1 ] = append (fork .Paths [1 ], dag .NewFilter (pullup ))
530+ continue
531+ }
532+ remaining = append (remaining , e )
533+ }
534+ if len (remaining ) == 0 {
535+ // Filter has been fully pulled up and can be removed.
536+ seq .Delete (i + 2 , i + 3 )
537+ } else {
538+ out := remaining [0 ]
539+ for _ , e := range remaining [1 :] {
540+ out = dag .NewBinaryExpr ("and" , e , out )
541+ }
542+ seq [i + 2 ] = dag .NewFilter (out )
543+ }
544+ fork .Paths [0 ] = joinFilterPullup (fork .Paths [0 ])
545+ fork .Paths [1 ] = joinFilterPullup (fork .Paths [1 ])
546+ }
547+ return seq
548+ }
549+
550+ func isJoin (op dag.Op ) (string , string , bool ) {
551+ switch op := op .(type ) {
552+ case * dag.HashJoin :
553+ return op .LeftAlias , op .RightAlias , true
554+ case * dag.Join :
555+ return op .LeftAlias , op .RightAlias , true
556+ default :
557+ return "" , "" , false
558+ }
559+ }
560+
561+ func breakupFilter (e dag.Expr ) []dag.Expr {
562+ if b , ok := e .(* dag.BinaryExpr ); ok && b .Op == "and" {
563+ return append (breakupFilter (b .LHS ), breakupFilter (b .RHS )... )
564+ }
565+ return []dag.Expr {e }
566+ }
567+
568+ func pullupExpr (alias string , expr dag.Expr ) (dag.Expr , bool ) {
569+ e , ok := expr .(* dag.BinaryExpr )
570+ if ! ok {
571+ return nil , false
572+ }
573+ if e .Op == "and" {
574+ lhs , lok := pullupExpr (alias , e .LHS )
575+ rhs , rok := pullupExpr (alias , e .RHS )
576+ if ! lok || ! rok {
577+ return nil , false
578+ }
579+ return dag .NewBinaryExpr ("and" , lhs , rhs ), true
580+ }
581+ if e .Op == "or" {
582+ lhs , lok := pullupExpr (alias , e .LHS )
583+ rhs , rok := pullupExpr (alias , e .RHS )
584+ if ! lok || ! rok {
585+ return nil , false
586+ }
587+ return dag .NewBinaryExpr ("or" , lhs , rhs ), true
588+
589+ }
590+ var literal * dag.Literal
591+ var this * dag.This
592+ for _ , e := range []dag.Expr {e .RHS , e .LHS } {
593+ if l , ok := e .(* dag.Literal ); ok && literal == nil {
594+ literal = l
595+ continue
596+ }
597+ if t , ok := e .(* dag.This ); ok && this == nil && len (t .Path ) > 1 && t .Path [0 ] == alias {
598+ this = t
599+ continue
600+ }
601+ return nil , false
602+ }
603+ path := slices .Clone (this .Path [1 :])
604+ return dag .NewBinaryExpr (e .Op , dag .NewThis (path ), literal ), true
605+ }
606+
509607func liftFilterOps (seq dag.Seq ) dag.Seq {
510608 walkT (reflect .ValueOf (& seq ), func (seq dag.Seq ) dag.Seq {
511609 for i := len (seq ) - 2 ; i >= 0 ; i -- {
0 commit comments