| 
7 | 7 | 
 
  | 
8 | 8 | package org.elasticsearch.xpack.esql.optimizer.rules.logical;  | 
9 | 9 | 
 
  | 
10 |  | -import org.elasticsearch.core.Tuple;  | 
11 | 10 | import org.elasticsearch.xpack.esql.core.expression.Alias;  | 
12 | 11 | import org.elasticsearch.xpack.esql.core.expression.Attribute;  | 
13 | 12 | import org.elasticsearch.xpack.esql.core.expression.AttributeMap;  | 
 | 
16 | 15 | import org.elasticsearch.xpack.esql.core.expression.Expressions;  | 
17 | 16 | import org.elasticsearch.xpack.esql.core.expression.FoldContext;  | 
18 | 17 | import org.elasticsearch.xpack.esql.core.expression.Literal;  | 
19 |  | -import org.elasticsearch.xpack.esql.core.expression.NamedExpression;  | 
20 | 18 | import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;  | 
21 | 19 | import org.elasticsearch.xpack.esql.core.util.CollectionUtils;  | 
22 | 20 | import org.elasticsearch.xpack.esql.expression.predicate.Predicates;  | 
23 | 21 | import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;  | 
24 | 22 | import org.elasticsearch.xpack.esql.plan.logical.Enrich;  | 
25 | 23 | import org.elasticsearch.xpack.esql.plan.logical.Eval;  | 
26 | 24 | import org.elasticsearch.xpack.esql.plan.logical.Filter;  | 
27 |  | -import org.elasticsearch.xpack.esql.plan.logical.Limit;  | 
28 | 25 | import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;  | 
29 | 26 | import org.elasticsearch.xpack.esql.plan.logical.OrderBy;  | 
30 | 27 | import org.elasticsearch.xpack.esql.plan.logical.Project;  | 
31 | 28 | import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;  | 
32 |  | -import org.elasticsearch.xpack.esql.plan.logical.Subquery;  | 
33 | 29 | import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;  | 
34 |  | -import org.elasticsearch.xpack.esql.plan.logical.UnionAll;  | 
35 | 30 | import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan;  | 
36 | 31 | import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;  | 
37 | 32 | import org.elasticsearch.xpack.esql.plan.logical.join.Join;  | 
 | 
42 | 37 | import java.util.function.Function;  | 
43 | 38 | import java.util.function.Predicate;  | 
44 | 39 | 
 
  | 
45 |  | -import static org.elasticsearch.xpack.esql.core.expression.Attribute.SYNTHETIC_ATTRIBUTE_NAME_SEPARATOR;  | 
46 |  | -import static org.elasticsearch.xpack.esql.core.expression.Attribute.rawTemporaryName;  | 
47 |  | - | 
48 | 40 | /**  | 
49 | 41 |  * Perform filters as early as possible in the logical plan by pushing them past certain plan nodes (like {@link Eval},  | 
50 | 42 |  * {@link RegexExtract}, {@link Enrich}, {@link Project}, {@link OrderBy} and left {@link Join}s) where possible.  | 
@@ -117,12 +109,6 @@ protected LogicalPlan rule(Filter filter, LogicalOptimizerContext ctx) {  | 
117 | 109 |             // See also https://github.com/elastic/elasticsearch/issues/127497  | 
118 | 110 |             // Push down past INLINE STATS if the condition is on the groupings  | 
119 | 111 |             return pushDownPastJoin(filter, join, ctx.foldCtx());  | 
120 |  | -        } else if (child instanceof UnionAll unionAll) {  | 
121 |  | -            // Push down filters that can be evaluated using only the output of the UnionAll  | 
122 |  | -            plan = maybePushDownPastUnionAll(filter, unionAll);  | 
123 |  | -        } else if (child instanceof Subquery subquery) {  | 
124 |  | -            // subquery is a placeholder, push down the filter to the child of the subquery  | 
125 |  | -            plan = subquery.replaceChild(new Filter(filter.source(), subquery.child(), filter.condition()));  | 
126 | 112 |         }  | 
127 | 113 |         // cannot push past a Limit, this could change the tailing result set returned  | 
128 | 114 |         return plan;  | 
@@ -297,222 +283,4 @@ private static LogicalPlan maybePushDownPastUnary(  | 
297 | 283 |         }  | 
298 | 284 |         return plan;  | 
299 | 285 |     }  | 
300 |  | - | 
301 |  | -    /* Push down filters that can be evaluated by the UnionAll child/leg to each child/leg,  | 
302 |  | -     * so that the filters can be pushed down further to the data source when possible.  | 
303 |  | -     * Filters that cannot be pushed down remain above the UnionAll.  | 
304 |  | -     *  | 
305 |  | -     * The children of a UnionAll/Fork plan has a similar pattern, as Fork adds EsqlProject,  | 
306 |  | -     * an optional Eval and Limit on top of its actual children.  | 
307 |  | -     * UnionAll  | 
308 |  | -     *   EsqlProject  | 
309 |  | -     *     Eval (optional)  | 
310 |  | -     *       Limit  | 
311 |  | -     *         EsRelation  | 
312 |  | -     *   EsqlProject  | 
313 |  | -     *     Eval (optional)  | 
314 |  | -     *       Limit  | 
315 |  | -     *         Subquery  | 
316 |  | -     *  | 
317 |  | -     * Push down the filter below limit when possible  | 
318 |  | -     */  | 
319 |  | -    private static LogicalPlan maybePushDownPastUnionAll(Filter filter, UnionAll unionAll) {  | 
320 |  | -        List<Expression> pushable = new ArrayList<>();  | 
321 |  | -        List<Expression> nonPushable = new ArrayList<>();  | 
322 |  | -        for (Expression exp : Predicates.splitAnd(filter.condition())) {  | 
323 |  | -            if (exp.references().subsetOf(unionAll.outputSet())) {  | 
324 |  | -                pushable.add(exp);  | 
325 |  | -            } else {  | 
326 |  | -                nonPushable.add(exp);  | 
327 |  | -            }  | 
328 |  | -        }  | 
329 |  | -        if (pushable.isEmpty()) {  | 
330 |  | -            return filter; // nothing to push down  | 
331 |  | -        }  | 
332 |  | -        // Push the filter down to each child of the UnionAll, the child of a UnionAll is always  | 
333 |  | -        // a project followed by an optional eval and then limit or a limit added by fork and  | 
334 |  | -        // then the real child, if there is unknown pattern, keep the filter and UnionAll plan unchanged  | 
335 |  | -        List<LogicalPlan> newChildren = new ArrayList<>();  | 
336 |  | -        boolean changed = false;  | 
337 |  | -        for (LogicalPlan child : unionAll.children()) {  | 
338 |  | -            LogicalPlan newChild = switch (child) {  | 
339 |  | -                case Project project -> maybePushDownFilterPastProjectForUnionAllChild(pushable, project);  | 
340 |  | -                case Limit limit -> maybePushDownFilterPastLimitForUnionAllChild(pushable, limit);  | 
341 |  | -                default -> null; // TODO add a general push down for unexpected pattern  | 
342 |  | -            };  | 
343 |  | - | 
344 |  | -            if (newChild == null) {  | 
345 |  | -                // Unexpected pattern, keep plan unchanged without pushing down filters  | 
346 |  | -                return filter;  | 
347 |  | -            }  | 
348 |  | - | 
349 |  | -            if (newChild != child) {  | 
350 |  | -                changed = true;  | 
351 |  | -                newChildren.add(newChild);  | 
352 |  | -            } else {  | 
353 |  | -                // Theoretically, all the pushable predicates should be pushed down into each child,  | 
354 |  | -                // in case one child is not changed, preserve the filter on top of UnionAll to make sure  | 
355 |  | -                // correct results are returned and avoid infinite loop of the rule.  | 
356 |  | -                return filter;  | 
357 |  | -            }  | 
358 |  | -        }  | 
359 |  | - | 
360 |  | -        if (changed == false) { // nothing changed, return the original plan  | 
361 |  | -            return filter;  | 
362 |  | -        }  | 
363 |  | - | 
364 |  | -        LogicalPlan newUnionAll = unionAll.replaceChildren(newChildren);  | 
365 |  | -        if (nonPushable.isEmpty()) {  | 
366 |  | -            return newUnionAll;  | 
367 |  | -        } else {  | 
368 |  | -            return filter.with(newUnionAll, Predicates.combineAnd(nonPushable));  | 
369 |  | -        }  | 
370 |  | -    }  | 
371 |  | - | 
372 |  | -    private static LogicalPlan maybePushDownFilterPastProjectForUnionAllChild(List<Expression> pushable, Project project) {  | 
373 |  | -        List<Expression> resolvedPushable = resolvePushableAgainstOutput(pushable, project.projections());  | 
374 |  | -        if (resolvedPushable == null) {  | 
375 |  | -            return project;  | 
376 |  | -        }  | 
377 |  | -        LogicalPlan child = project.child();  | 
378 |  | -        if (child instanceof Eval eval) {  | 
379 |  | -            return pushDownFilterPastEvalForUnionAllChild(resolvedPushable, project, eval);  | 
380 |  | -        } else if (child instanceof Limit limit) {  | 
381 |  | -            LogicalPlan newLimit = pushDownFilterPastLimitForUnionAllChild(resolvedPushable, limit);  | 
382 |  | -            return project.replaceChild(newLimit);  | 
383 |  | -        }  | 
384 |  | -        return project;  | 
385 |  | -    }  | 
386 |  | - | 
387 |  | -    private static LogicalPlan maybePushDownFilterPastLimitForUnionAllChild(List<Expression> pushable, Limit limit) {  | 
388 |  | -        List<Expression> resolvedPushable = resolvePushableAgainstOutput(pushable, limit.output());  | 
389 |  | -        if (resolvedPushable == null) {  | 
390 |  | -            return limit;  | 
391 |  | -        }  | 
392 |  | -        return pushDownFilterPastLimitForUnionAllChild(resolvedPushable, limit);  | 
393 |  | -    }  | 
394 |  | - | 
395 |  | -    /**  | 
396 |  | -     * Attempts to resolve all pushable expressions against the given output attributes.  | 
397 |  | -     * Returns a fully resolved list if successful, or null if any expression cannot be resolved.  | 
398 |  | -     */  | 
399 |  | -    private static List<Expression> resolvePushableAgainstOutput(List<Expression> pushable, List<? extends NamedExpression> output) {  | 
400 |  | -        List<Expression> resolved = new ArrayList<>();  | 
401 |  | -        for (Expression exp : pushable) {  | 
402 |  | -            Expression replaced = resolveUnionAllOutputByName(exp, output);  | 
403 |  | -            // Make sure the pushable predicates can find their corresponding attributes in the output  | 
404 |  | -            if (replaced == null || replaced == exp) {  | 
405 |  | -                // cannot find the attribute in the child project, cannot push down this filter  | 
406 |  | -                return null;  | 
407 |  | -            }  | 
408 |  | -            resolved.add(replaced);  | 
409 |  | -        }  | 
410 |  | -        // If some pushable predicates cannot be resolved against the output, cannot push filter down.  | 
411 |  | -        // This should not happen, however we need to be cautious here, if the predicate is removed from  | 
412 |  | -        // the main query, and it is not pushed down into the UnionAll child, the result will be incorrect.  | 
413 |  | -        return resolved.size() == pushable.size() ? resolved : null;  | 
414 |  | -    }  | 
415 |  | - | 
416 |  | -    private static LogicalPlan pushDownFilterPastEvalForUnionAllChild(List<Expression> pushable, Project project, Eval eval) {  | 
417 |  | -        // if the pushable references any attribute created by the eval, we cannot push down  | 
418 |  | -        AttributeMap<Expression> evalAliases = buildEvaAliases(eval);  | 
419 |  | -        Tuple<List<Expression>, List<Expression>> pushablesAndNonPushables = splitPushableAndNonPushablePredicates(  | 
420 |  | -            pushable,  | 
421 |  | -            exp -> exp.references().stream().anyMatch(evalAliases::containsKey)  | 
422 |  | -        );  | 
423 |  | -        List<Expression> pushables = pushablesAndNonPushables.v1();  | 
424 |  | -        List<Expression> nonPushables = pushablesAndNonPushables.v2();  | 
425 |  | - | 
426 |  | -        LogicalPlan evalChild = eval.child();  | 
427 |  | - | 
428 |  | -        // Nothing to push down under eval and limit  | 
429 |  | -        if (pushables.isEmpty()) {  | 
430 |  | -            return nonPushables.isEmpty()  | 
431 |  | -                ? project // nothing at all  | 
432 |  | -                : withFilter(project, eval, nonPushables); // Push down filter references eval created attributes below project, above eval  | 
433 |  | -        }  | 
434 |  | - | 
435 |  | -        // Push down all pushable predicates below eval and limit  | 
436 |  | -        if (evalChild instanceof Limit limit) {  | 
437 |  | -            LogicalPlan newLimit = pushDownFilterPastLimitForUnionAllChild(pushables, limit);  | 
438 |  | -            LogicalPlan newEval = eval.replaceChild(newLimit);  | 
439 |  | - | 
440 |  | -            return nonPushables.isEmpty() ? project.replaceChild(newEval) : withFilter(project, newEval, nonPushables);  | 
441 |  | -        }  | 
442 |  | - | 
443 |  | -        return project;  | 
444 |  | -    }  | 
445 |  | - | 
446 |  | -    private static LogicalPlan withFilter(Project project, LogicalPlan child, List<Expression> predicates) {  | 
447 |  | -        Expression combined = Predicates.combineAnd(predicates);  | 
448 |  | -        return project.replaceChild(new Filter(project.source(), child, combined));  | 
449 |  | -    }  | 
450 |  | - | 
451 |  | -    /**  | 
452 |  | -     * limit does not create any new attributes, so we should push down all pushable predicates,  | 
453 |  | -     * the caller should make sure the pushable is really pushable.  | 
454 |  | -     */  | 
455 |  | -    private static LogicalPlan pushDownFilterPastLimitForUnionAllChild(List<Expression> pushable, Limit limit) {  | 
456 |  | -        if (pushable.isEmpty()) {  | 
457 |  | -            return limit;  | 
458 |  | -        }  | 
459 |  | -        Expression combined = Predicates.combineAnd(pushable);  | 
460 |  | -        Filter pushed = new Filter(limit.source(), limit.child(), combined);  | 
461 |  | -        return limit.replaceChild(pushed);  | 
462 |  | -    }  | 
463 |  | - | 
464 |  | -    private static AttributeMap<Expression> buildEvaAliases(Eval eval) {  | 
465 |  | -        AttributeMap.Builder<Expression> builder = AttributeMap.builder();  | 
466 |  | -        for (Alias alias : eval.fields()) {  | 
467 |  | -            builder.put(alias.toAttribute(), alias.child());  | 
468 |  | -        }  | 
469 |  | -        return builder.build();  | 
470 |  | -    }  | 
471 |  | - | 
472 |  | -    private static Tuple<List<Expression>, List<Expression>> splitPushableAndNonPushablePredicates(  | 
473 |  | -        List<Expression> predicates,  | 
474 |  | -        Predicate<Expression> nonPushableCheck  | 
475 |  | -    ) {  | 
476 |  | -        List<Expression> pushable = new ArrayList<>();  | 
477 |  | -        List<Expression> nonPushable = new ArrayList<>();  | 
478 |  | -        for (Expression exp : predicates) {  | 
479 |  | -            if (nonPushableCheck.test(exp)) {  | 
480 |  | -                nonPushable.add(exp);  | 
481 |  | -            } else {  | 
482 |  | -                pushable.add(exp);  | 
483 |  | -            }  | 
484 |  | -        }  | 
485 |  | -        return Tuple.tuple(pushable, nonPushable);  | 
486 |  | -    }  | 
487 |  | - | 
488 |  | -    /**  | 
489 |  | -     * The UnionAll/Fork outputs have the same names as it children's outputs, however they have different ids.  | 
490 |  | -     * Convert the pushable predicates to use the child's attributes, so that they can be pushed down further.  | 
491 |  | -     */  | 
492 |  | -    private static Expression resolveUnionAllOutputByName(Expression expr, List<? extends NamedExpression> namedExpressions) {  | 
493 |  | -        // A temporary expression is created with temporary attributes names, as sometimes transform expression does not transform  | 
494 |  | -        // one ReferenceAttribute to another ReferenceAttribute with the same name, different id successfully.  | 
495 |  | -        String UNIONALL = "unionall";  | 
496 |  | -        // rename the output of the UnionAll to a temporary name with a prefix  | 
497 |  | -        Expression renamed = expr.transformUp(Attribute.class, attr -> {  | 
498 |  | -            for (NamedExpression ne : namedExpressions) {  | 
499 |  | -                if (ne.name().equals(attr.name())) {  | 
500 |  | -                    // $$subquery$attr.name()  | 
501 |  | -                    return attr.withName(rawTemporaryName(UNIONALL, ne.name()));  | 
502 |  | -                }  | 
503 |  | -            }  | 
504 |  | -            return attr;  | 
505 |  | -        });  | 
506 |  | - | 
507 |  | -        String prefix = Attribute.SYNTHETIC_ATTRIBUTE_NAME_PREFIX + UNIONALL + SYNTHETIC_ATTRIBUTE_NAME_SEPARATOR;  | 
508 |  | -        return renamed.transformUp(Attribute.class, attr -> {  | 
509 |  | -            String originalName = attr.name().startsWith(prefix) ? attr.name().substring(prefix.length()) : attr.name();  | 
510 |  | -            for (NamedExpression ne : namedExpressions) {  | 
511 |  | -                if (ne.name().equals(originalName)) {  | 
512 |  | -                    return ne.toAttribute();  | 
513 |  | -                }  | 
514 |  | -            }  | 
515 |  | -            return attr;  | 
516 |  | -        });  | 
517 |  | -    }  | 
518 | 286 | }  | 
0 commit comments