|
7 | 7 |
|
8 | 8 | package org.elasticsearch.xpack.esql.optimizer.rules.logical; |
9 | 9 |
|
| 10 | +import org.elasticsearch.core.Tuple; |
10 | 11 | import org.elasticsearch.xpack.esql.core.expression.Alias; |
11 | 12 | import org.elasticsearch.xpack.esql.core.expression.Attribute; |
12 | 13 | import org.elasticsearch.xpack.esql.core.expression.AttributeMap; |
|
15 | 16 | import org.elasticsearch.xpack.esql.core.expression.Expressions; |
16 | 17 | import org.elasticsearch.xpack.esql.core.expression.FoldContext; |
17 | 18 | import org.elasticsearch.xpack.esql.core.expression.Literal; |
| 19 | +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; |
18 | 20 | import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; |
19 | 21 | import org.elasticsearch.xpack.esql.core.util.CollectionUtils; |
20 | 22 | import org.elasticsearch.xpack.esql.expression.predicate.Predicates; |
21 | 23 | import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; |
22 | 24 | import org.elasticsearch.xpack.esql.plan.logical.Enrich; |
23 | 25 | import org.elasticsearch.xpack.esql.plan.logical.Eval; |
24 | 26 | import org.elasticsearch.xpack.esql.plan.logical.Filter; |
| 27 | +import org.elasticsearch.xpack.esql.plan.logical.Limit; |
25 | 28 | import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; |
26 | 29 | import org.elasticsearch.xpack.esql.plan.logical.OrderBy; |
27 | 30 | import org.elasticsearch.xpack.esql.plan.logical.Project; |
28 | 31 | import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; |
| 32 | +import org.elasticsearch.xpack.esql.plan.logical.Subquery; |
29 | 33 | import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; |
| 34 | +import org.elasticsearch.xpack.esql.plan.logical.UnionAll; |
30 | 35 | import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan; |
31 | 36 | import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; |
32 | 37 | import org.elasticsearch.xpack.esql.plan.logical.join.Join; |
@@ -109,6 +114,12 @@ protected LogicalPlan rule(Filter filter, LogicalOptimizerContext ctx) { |
109 | 114 | // See also https://github.com/elastic/elasticsearch/issues/127497 |
110 | 115 | // Push down past INLINE STATS if the condition is on the groupings |
111 | 116 | return pushDownPastJoin(filter, join, ctx.foldCtx()); |
| 117 | + } else if (child instanceof UnionAll unionAll) { |
| 118 | + // Push down filters that can be evaluated using only the output of the UnionAll |
| 119 | + plan = maybePushDownPastUnionAll(filter, unionAll); |
| 120 | + } else if (child instanceof Subquery subquery) { |
| 121 | + // subquery is a placeholder, push down the filter to the child of the subquery |
| 122 | + plan = subquery.replaceChild(new Filter(filter.source(), subquery.child(), filter.condition())); |
112 | 123 | } |
113 | 124 | // cannot push past a Limit, this could change the tailing result set returned |
114 | 125 | return plan; |
@@ -283,4 +294,160 @@ private static LogicalPlan maybePushDownPastUnary( |
283 | 294 | } |
284 | 295 | return plan; |
285 | 296 | } |
| 297 | + |
| 298 | + /* Push down filters that can be evaluated by the UnionAll child/leg to each child/leg, |
| 299 | + * so that the filters can be pushed down further to the data source if possible. |
| 300 | + * Filters that cannot be pushed down remain above the UnionAll. |
| 301 | + * |
| 302 | + * The children of a UnionAll/Fork plan has a similar pattern, as Fork adds EsqlProject, |
| 303 | + * an optional Eval and Limit on top of its actual children. |
| 304 | + * UnionAll |
| 305 | + * EsqlProject |
| 306 | + * Eval (optional) |
| 307 | + * Limit |
| 308 | + * EsRelation |
| 309 | + * EsqlProject |
| 310 | + * Eval (optional) |
| 311 | + * Limit |
| 312 | + * Subquery |
| 313 | + * |
| 314 | + * Push down the filter below limit when possible |
| 315 | + */ |
| 316 | + private static LogicalPlan maybePushDownPastUnionAll(Filter filter, UnionAll unionAll) { |
| 317 | + List<Expression> pushable = new ArrayList<>(); |
| 318 | + List<Expression> nonPushable = new ArrayList<>(); |
| 319 | + for (Expression exp : Predicates.splitAnd(filter.condition())) { |
| 320 | + if (exp.references().subsetOf(unionAll.outputSet())) { |
| 321 | + pushable.add(exp); |
| 322 | + } else { |
| 323 | + nonPushable.add(exp); |
| 324 | + } |
| 325 | + } |
| 326 | + if (pushable.isEmpty()) { |
| 327 | + return filter; // nothing to push down |
| 328 | + } |
| 329 | + |
| 330 | + // Push the filter down to each child of the UnionAll, the child of a UnionAll is always a project |
| 331 | + // followed by an optional eval and then limit added by fork and then the real child |
| 332 | + List<LogicalPlan> newChildren = new ArrayList<>(); |
| 333 | + boolean changed = false; |
| 334 | + for (LogicalPlan child : unionAll.children()) { |
| 335 | + if (child instanceof Project project) { |
| 336 | + LogicalPlan newChild = maybePushDownFilterPastEvalAndLimitForUnionAllChild(pushable, project); |
| 337 | + if (newChild != child) { |
| 338 | + changed = true; |
| 339 | + } |
| 340 | + newChildren.add(newChild); |
| 341 | + } else { // unexpected pattern, just add the child as is |
| 342 | + newChildren.add(child); |
| 343 | + } |
| 344 | + } |
| 345 | + |
| 346 | + if (changed == false) { // nothing changed, return the original plan |
| 347 | + return filter; |
| 348 | + } |
| 349 | + |
| 350 | + LogicalPlan newUnionAll = unionAll.replaceChildren(newChildren); |
| 351 | + if (nonPushable.isEmpty()) { |
| 352 | + return newUnionAll; |
| 353 | + } else { |
| 354 | + return filter.with(newUnionAll, Predicates.combineAnd(nonPushable)); |
| 355 | + } |
| 356 | + } |
| 357 | + |
| 358 | + private static LogicalPlan maybePushDownFilterPastEvalAndLimitForUnionAllChild(List<Expression> pushable, Project project) { |
| 359 | + LogicalPlan child = project.child(); |
| 360 | + if (child instanceof Eval eval) { |
| 361 | + return pushDownFilterPastEvalForUnionAllChild(pushable, project, eval); |
| 362 | + } else if (child instanceof Limit limit) { |
| 363 | + LogicalPlan newLimit = pushDownFilterPastLimitForUnionAllChild(pushable, limit); |
| 364 | + return project.replaceChild(newLimit); |
| 365 | + } |
| 366 | + return project; |
| 367 | + } |
| 368 | + |
| 369 | + private static LogicalPlan pushDownFilterPastEvalForUnionAllChild(List<Expression> pushable, Project project, Eval eval) { |
| 370 | + AttributeMap<Expression> evalAliases = buildEvaAliases(eval); |
| 371 | + |
| 372 | + Tuple<List<Expression>, List<Expression>> pushablesAndNonPushables = splitPushableAndNonPushablePredicates( |
| 373 | + pushable, |
| 374 | + project.projections(), |
| 375 | + exp -> exp.references().stream().anyMatch(evalAliases::containsKey) |
| 376 | + ); |
| 377 | + |
| 378 | + LogicalPlan evalChild = eval.child(); |
| 379 | + |
| 380 | + if (evalChild instanceof Limit limit) { |
| 381 | + LogicalPlan newLimit = pushDownFilterPastLimitForUnionAllChild(pushablesAndNonPushables.v1(), limit); |
| 382 | + LogicalPlan newEval = eval.replaceChild(newLimit); |
| 383 | + if (pushablesAndNonPushables.v2().isEmpty()) { |
| 384 | + return project.replaceChild(newEval); |
| 385 | + } else { |
| 386 | + Filter newFilter = new Filter(project.source(), newEval, Predicates.combineAnd(pushablesAndNonPushables.v2())); |
| 387 | + return project.replaceChild(newFilter); |
| 388 | + } |
| 389 | + } |
| 390 | + return project; |
| 391 | + } |
| 392 | + |
| 393 | + private static LogicalPlan pushDownFilterPastLimitForUnionAllChild(List<Expression> pushable, Limit limit) { |
| 394 | + // check whether the pushable expression's attribute needs to be replaced |
| 395 | + Tuple<List<Expression>, List<Expression>> pushablesAndNonPushables = splitPushableAndNonPushablePredicates( |
| 396 | + pushable, |
| 397 | + limit.output(), |
| 398 | + exp -> exp.references().subsetOf(limit.outputSet()) == false |
| 399 | + ); |
| 400 | + |
| 401 | + if (pushablesAndNonPushables.v1().isEmpty() || pushablesAndNonPushables.v2().isEmpty() == false) { |
| 402 | + return limit; |
| 403 | + } |
| 404 | + Expression combined = Predicates.combineAnd(pushablesAndNonPushables.v1()); |
| 405 | + Filter pushed = new Filter(limit.source(), limit.child(), combined); |
| 406 | + return limit.replaceChild(pushed); |
| 407 | + } |
| 408 | + |
| 409 | + private static AttributeMap<Expression> buildEvaAliases(Eval eval) { |
| 410 | + AttributeMap.Builder<Expression> builder = AttributeMap.builder(); |
| 411 | + for (Alias alias : eval.fields()) { |
| 412 | + builder.put(alias.toAttribute(), alias.child()); |
| 413 | + } |
| 414 | + return builder.build(); |
| 415 | + } |
| 416 | + |
| 417 | + private static Tuple<List<Expression>, List<Expression>> splitPushableAndNonPushablePredicates( |
| 418 | + List<Expression> predicates, |
| 419 | + List<? extends NamedExpression> attributes, |
| 420 | + Predicate<Expression> nonPushableCheck |
| 421 | + ) { |
| 422 | + List<Expression> pushable = new ArrayList<>(); |
| 423 | + List<Expression> nonPushable = new ArrayList<>(); |
| 424 | + for (Expression exp : predicates) { |
| 425 | + Expression replaced = replaceAttributesByName(exp, attributes); |
| 426 | + if (replaced == null || nonPushableCheck.test(replaced)) { |
| 427 | + nonPushable.add(replaced); |
| 428 | + } else { |
| 429 | + pushable.add(replaced); |
| 430 | + } |
| 431 | + } |
| 432 | + return Tuple.tuple(pushable, nonPushable); |
| 433 | + } |
| 434 | + |
| 435 | + private static Expression replaceAttributesByName(Expression expr, List<? extends NamedExpression> namedExpressions) { |
| 436 | + // Collect all referenced attributes |
| 437 | + for (Attribute attr : expr.references()) { |
| 438 | + boolean found = namedExpressions.stream().anyMatch(ne -> ne.name().equals(attr.name())); |
| 439 | + if (found == false) { |
| 440 | + return null; |
| 441 | + } |
| 442 | + } |
| 443 | + // Replace attributes by name |
| 444 | + return expr.transformUp(Attribute.class, attr -> { |
| 445 | + for (NamedExpression ne : namedExpressions) { |
| 446 | + if (ne.name().equals(attr.name())) { |
| 447 | + return ne.toAttribute(); |
| 448 | + } |
| 449 | + } |
| 450 | + return attr; |
| 451 | + }); |
| 452 | + } |
286 | 453 | } |
0 commit comments