From 52893e608434f3c3803fdd7c74f0a790167cff83 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Wed, 26 Nov 2025 11:22:24 +0200 Subject: [PATCH 1/4] Adding support for pruning columns in fork branches --- docs/changelog/137907.yaml | 6 + .../esql/core/expression/AttributeSet.java | 29 +- .../esql/optimizer/LogicalPlanOptimizer.java | 2 + .../optimizer/rules/logical/PruneColumns.java | 62 ++- .../logical/PruneColumnsInForkBranches.java | 127 ++++++ .../xpack/esql/plan/logical/Fork.java | 4 +- .../optimizer/LogicalPlanOptimizerTests.java | 430 ++++++++++++++++++ 7 files changed, 641 insertions(+), 19 deletions(-) create mode 100644 docs/changelog/137907.yaml create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java diff --git a/docs/changelog/137907.yaml b/docs/changelog/137907.yaml new file mode 100644 index 0000000000000..c002a2b7d13a5 --- /dev/null +++ b/docs/changelog/137907.yaml @@ -0,0 +1,6 @@ +pr: 137907 +summary: Prune columns when using fork +area: ES|QL +type: bug +issues: + - 136365 diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeSet.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeSet.java index fefaf3098319e..df13696b8fcd8 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeSet.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeSet.java @@ -212,11 +212,15 @@ public static Builder builder(int expectedSize) { return new Builder(AttributeMap.builder(expectedSize)); } + public static Builder forkBuilder() { + return new ForkBuilder(AttributeMap.builder()); + } + public static class Builder { - private final AttributeMap.Builder mapBuilder; + protected final AttributeMap.Builder mapBuilder; - private Builder(AttributeMap.Builder mapBuilder) { + protected Builder(AttributeMap.Builder mapBuilder) { this.mapBuilder = mapBuilder; } @@ -266,4 +270,25 @@ public void clear() { mapBuilder.keySet().clear(); } } + + /** + * This class extends {@code Builder}, but its {@code contains} method also matches {@code NamedExpression} instances by their name. + * This is useful for Fork plans, where branches may have different Attribute IDs but share a common output schema, + * allowing equality checks of used attributes based on their names. + */ + public static class ForkBuilder extends Builder { + + private ForkBuilder(AttributeMap.Builder mapBuilder) { + super(mapBuilder); + } + + @Override + public boolean contains(Object o) { + if (super.contains(o)) { + return true; + } + return o instanceof NamedExpression + && mapBuilder.keySet().stream().anyMatch(x -> x.name().equals(((NamedExpression) o).name())); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index c02f58b234dd8..3599061aa5e9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateNullable; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropgateUnmappedFields; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumnsInForkBranches; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyAggregates; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneFilters; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneLiteralsInOrderBy; @@ -215,6 +216,7 @@ protected static Batch operators() { new PruneFilters(), new PruneColumns(), new PruneLiteralsInOrderBy(), + new PruneColumnsInForkBranches(), new PushDownAndCombineLimits(), new PushLimitToKnn(), new PushDownAndCombineFilters(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java index e3ed7d0faf295..9b72114a560e9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.UnionAll; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; @@ -32,7 +33,9 @@ import org.elasticsearch.xpack.esql.rule.Rule; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans.skipPlan; @@ -46,9 +49,8 @@ public LogicalPlan apply(LogicalPlan plan) { return pruneColumns(plan, plan.outputSet().asBuilder(), false); } - private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boolean inlineJoin) { + static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boolean inlineJoin) { Holder forkPresent = new Holder<>(false); - // while going top-to-bottom (upstream) return plan.transformDown(p -> { // Note: It is NOT required to do anything special for binary plans like JOINs, except INLINE STATS. It is perfectly fine that @@ -58,17 +60,13 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u // same index fields will have different name ids in the left and right hand sides - as in the extreme example // `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`. - // TODO: revisit with every new command - // skip nodes that simply pass the input through and use no references - if (p instanceof Limit || p instanceof Sample) { + if (forkPresent.get()) { return p; } - if (p instanceof Fork) { - forkPresent.set(true); - } - // pruning columns for Fork branches can have the side effect of having misaligned outputs - if (forkPresent.get()) { + // TODO: revisit with every new command + // skip nodes that simply pass the input through and use no references + if (p instanceof Limit || p instanceof Sample) { return p; } @@ -83,6 +81,10 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u case Eval eval -> pruneColumnsInEval(eval, used, recheck); case Project project -> inlineJoin ? pruneColumnsInProject(project, used) : p; case EsRelation esr -> pruneColumnsInEsRelation(esr, used); + case Fork fork -> { + forkPresent.set(true); + yield pruneColumnsInFork(fork, used); + } default -> p; }; } while (recheck.get()); @@ -94,7 +96,7 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u }); } - private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Builder used, boolean inlineJoin) { + static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Builder used, boolean inlineJoin) { LogicalPlan p = aggregate; var remaining = pruneUnusedAndAddReferences(aggregate.aggregates(), used); @@ -134,7 +136,7 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut return p; } - private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder recheck) { + static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder recheck) { LogicalPlan p = ij; used.addAll(ij.references()); @@ -155,7 +157,7 @@ private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, Attribut return p; } - private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Holder recheck) { + static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Holder recheck) { LogicalPlan p = eval; var remaining = pruneUnusedAndAddReferences(eval.fields(), used); @@ -173,7 +175,7 @@ private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder us } // Note: only run when the Project is a descendent of an InlineJoin. - private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) { + static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) { LogicalPlan p = project; var remaining = pruneUnusedAndAddReferences(project.projections(), used); @@ -184,7 +186,7 @@ private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.B return p; } - private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder used) { + static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder used) { LogicalPlan p = esr; if (esr.indexMode() == IndexMode.LOOKUP) { @@ -200,6 +202,36 @@ private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet return p; } + private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder used) { + // prune the output attributes of fork based on usage from the rest of the plan + // this does not consider the inner usage within each branch of the fork + // as those will be handled when traversing down each branch in PruneColumnsInForkBranches + LogicalPlan p = fork; + + // should exit early for UnionAll + if (fork instanceof UnionAll) { + return p; + } + boolean changed = false; + AttributeSet.Builder builder = AttributeSet.builder(); + // if any of the fork outputs are used, keep them + // otherwise, prune them based on the rest of the plan's usage + Set names = new HashSet<>(used.build().names()); + for (var attr : fork.output()) { + // we should also ensure to keep any synthetic attributes around as those could still be used for internal processing + if (attr.synthetic() || names.contains(attr.name())) { + builder.add(attr); + } else { + changed = true; + } + } + if (changed) { + List attrs = builder.build().stream().toList(); + p = new Fork(fork.source(), fork.children(), attrs); + } + return p; + } + private static LogicalPlan emptyLocalRelation(UnaryPlan plan) { // create an empty local relation with no attributes return skipPlan(plan); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java new file mode 100644 index 0000000000000..b7d2df362ddc8 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.xpack.esql.core.expression.AttributeSet; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Fork; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.Sample; +import org.elasticsearch.xpack.esql.plan.logical.UnionAll; +import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; +import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; +import org.elasticsearch.xpack.esql.rule.Rule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInAggregate; +import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInEsRelation; +import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInEval; +import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInInlineJoinRight; + +/** + * This is used to prune unused columns and expressions in each branch of a Fork. + * The output for each fork branch has already been pruned in {@code PruneColumns#pruneColumnsInFork}, so here we only need to + * remove unused columns and expressions in the sub-plans of each branch, similarly to independently running {@code PruneColumns}. + */ +public final class PruneColumnsInForkBranches extends Rule { + + @Override + public LogicalPlan apply(LogicalPlan plan) { + + // collect used attributes from the plan above fork + var used = AttributeSet.forkBuilder(); + var forkFound = new Holder<>(false); + + // traverse down to the fork node + return plan.transformDown(p -> { + // if fork is not found yet, keep collecting used attributes from everything above. + // Once fork is found, just return the rest of the plan as is, as any pruning/transformation will have + // taken place in pruneSubPlan for each of the fork branches. + if (false == (p instanceof Fork) || forkFound.get()) { + if (false == forkFound.get()) { + used.addAll(p.references()); + } + return p; + } + + // only do this for fork + if (p instanceof UnionAll) { + return p; + } + + used.addAll(p.output()); + forkFound.set(true); + var forkOutputNames = p.output().stream().map(NamedExpression::name).collect(Collectors.toSet()); + boolean changed = false; + List newChildren = new ArrayList<>(); + for (var child : p.children()) { + var clonedUsed = AttributeSet.forkBuilder().addAll(used); + var newChild = pruneSubPlan(child, clonedUsed, forkOutputNames); + newChildren.add(newChild); + if (false == newChild.equals(child)) { + changed = true; + } + } + if (changed) { + return new Fork(p.source(), newChildren, p.output()); + } else { + return p; + } + }); + } + + private static LogicalPlan pruneSubPlan(LogicalPlan plan, AttributeSet.Builder usedAttrs, Set forkOutput) { + if (plan instanceof LocalRelation localRelation) { + var outputAttrs = localRelation.output().stream().filter(usedAttrs::contains).collect(Collectors.toList()); + return new LocalRelation(localRelation.source(), outputAttrs, localRelation.supplier()); + } + + var projectHolder = new Holder<>(false); + return plan.transformDown(p -> { + if (p instanceof Limit || p instanceof Sample) { + return p; + } + + var recheck = new Holder(); + do { + // we operate using the names of the fields, rather than comparing the attributes directly, + // as attributes may have been recreated during the transformations of fork branches. + recheck.set(false); + p = switch (p) { + case Aggregate agg -> pruneColumnsInAggregate(agg, usedAttrs, false); + case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, usedAttrs, recheck); + case Eval eval -> pruneColumnsInEval(eval, usedAttrs, recheck); + case Project project -> { + // process only the direct Project after Fork, but skip any subsequent instances + if (projectHolder.get()) { + yield p; + } else { + projectHolder.set(true); + var prunedAttrs = project.projections().stream().filter(x -> forkOutput.contains(x.name())).toList(); + yield new Project(project.source(), project.child(), prunedAttrs); + } + } + case EsRelation esr -> pruneColumnsInEsRelation(esr, usedAttrs); + default -> p; + }; + } while (recheck.get()); + usedAttrs.addAll(p.references()); + return p; + }); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java index 09298c918fd22..3f0634771c489 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java @@ -162,7 +162,7 @@ public static Set outputUnsupportedAttributeNames(List subp @Override public int hashCode() { - return Objects.hash(Fork.class, children()); + return Objects.hash(Fork.class, output, children()); } @Override @@ -175,7 +175,7 @@ public boolean equals(Object o) { } Fork other = (Fork) o; - return Objects.equals(children(), other.children()); + return Objects.equals(output, other.output) && Objects.equals(children(), other.children()); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index ab71a0cc36027..b57d3caecdac2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -121,6 +121,7 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.Grok; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -154,6 +155,7 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -210,6 +212,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -9526,4 +9529,431 @@ STATS std_dev(network.eth0.currently_connected_clients) to be present when run with the TS command, but it was not present.""") ); } + + /** + * EsqlProject[[first_name{r}#32]] + * \_Limit[1000[INTEGER],false,false] + * \_Filter[_fork{r}#33 == fork1[KEYWORD]] + * \_Fork[[first_name{r}#32, _fork{r}#33]] + * |_Project[[first_name{f}#11, _fork{r}#7]] + * | \_Eval[[fork1[KEYWORD] AS _fork#7]] + * | \_Limit[1000[INTEGER],false,false] + * | \_EsRelation[employees][_meta_field{f}#16, emp_no{f}#10, first_name{f}#11, ..] + * \_Project[[first_name{f}#22, _fork{r}#7]] + * \_Eval[[fork2[KEYWORD] AS _fork#7]] + * \_Limit[1000[INTEGER],false,false] + * \_EsRelation[employees][_meta_field{f}#27, emp_no{f}#21, first_name{f}#22, ..] + */ + public void testPruneColumnsInForkBranchesSimpleEvalOutsideBranches() { + var query = """ + FROM employees + | KEEP first_name + | EVAL x = 1.0 + | DROP x + | FORK + (WHERE true) + (WHERE true) + | WHERE _fork == "fork1" + | DROP _fork + """; + var plan = optimizedPlan(query); + var project = as(plan, EsqlProject.class); + assertThat(project.projections().size(), equalTo(1)); + assertThat(Expressions.names(project.projections()), contains("first_name")); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var fork = as(filter.child(), Fork.class); + assertThat(fork.output(), hasSize(2)); + assertThat(fork.output().stream().map(Attribute::name).collect(Collectors.toSet()), hasItems("first_name", "_fork")); + for (LogicalPlan branch : fork.children()) { + var branchProject = as(branch, Project.class); + assertThat(branchProject.projections().size(), equalTo(2)); + assertThat(Expressions.names(branchProject.projections()), containsInAnyOrder("first_name", "_fork")); + var branchEval = as(branchProject.child(), Eval.class); + var alias = as(branchEval.fields().getFirst(), Alias.class); + assertThat(alias.name(), equalTo("_fork")); + var limitInBranch = as(branchEval.child(), Limit.class); + var relation = as(limitInBranch.child(), EsRelation.class); + assertThat(relation.output().stream().map(Attribute::name).collect(Collectors.toSet()), hasItems("emp_no", "first_name")); + } + } + + /** + * EsqlProject[[_fork{r}#76, emp_no{r}#62, x{r}#73, y{r}#74, z{r}#75]] + * \_TopN[[Order[_fork{r}#76,ASC,LAST], Order[emp_no{r}#62,ASC,LAST]],1000[INTEGER],false] + * \_Fork[[emp_no{r}#62, x{r}#73, y{r}#74, z{r}#75, _fork{r}#76]] + * |_Project[[emp_no{f}#37, x{r}#15, y{r}#16, z{r}#17, _fork{r}#5]] + * | \_Eval[[fork1[KEYWORD] AS _fork#5]] + * | \_Grok[a{r}#10,Parser[pattern=%{WORD:x} %{WORD:y} %{WORD:z}, grok=org.elasticsearch.grok.Grok@1702ca8e],[x{r}#15, y{r}# + * 16, z{r}#17]] + * | \_Eval[[CONCAT(first_name{f}#38, [KEYWORD],TOSTRING(emp_no{f}#37), [KEYWORD],last_name{f}#41) AS a#10]] + * | \_Limit[1000[INTEGER],false,false] + * | \_Filter[IN(10048[INTEGER],10081[INTEGER],emp_no{f}#37)] + * | \_EsRelation[employees][_meta_field{f}#43, emp_no{f}#37, first_name{f}#38, ..] + * \_Project[[emp_no{f}#48, x{r}#27, y{r}#28, z{r}#29, _fork{r}#5]] + * \_Eval[[fork2[KEYWORD] AS _fork#5]] + * \_Grok[b{r}#22,Parser[pattern=%{WORD:x} %{WORD:y} %{WORD:z}, grok=org.elasticsearch.grok.Grok@37d58d35],[x{r}#27, y{r}# + * 28, z{r}#29]] + * \_Eval[[CONCAT(last_name{f}#52, [KEYWORD],TOSTRING(emp_no{f}#48), [KEYWORD],first_name{f}#49) AS b#22]] + * \_Limit[1000[INTEGER],false,false] + * \_Filter[IN(10048[INTEGER],10081[INTEGER],emp_no{f}#48)] + * \_EsRelation[employees][_meta_field{f}#54, emp_no{f}#48, first_name{f}#49, ..] + */ + public void testPruneColumnsInForkBranchesDropNestedEvalsFromOutputIfNotNeeded() { + // In this test, the EVALs that create 'a' and 'b' inside each branch should be dropped from the output + // since they are not needed after the GROK + var query = """ + FROM employees + | WHERE emp_no == 10048 OR emp_no == 10081 + | FORK (EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name) + | GROK a "%{WORD:x} %{WORD:y} %{WORD:z}" ) + (EVAL b = CONCAT(last_name, " ", emp_no::keyword, " ", first_name) + | GROK b "%{WORD:x} %{WORD:y} %{WORD:z}" ) + | KEEP _fork, emp_no, x, y, z + | SORT _fork, emp_no + """; + var plan = optimizedPlan(query); + var project = as(plan, EsqlProject.class); + assertThat(project.projections().size(), equalTo(5)); + assertThat(Expressions.names(project.projections()), containsInAnyOrder("_fork", "emp_no", "x", "y", "z")); + var topN = as(project.child(), TopN.class); + var fork = as(topN.child(), Fork.class); + assertThat(fork.output(), hasSize(5)); + assertThat(fork.output().stream().map(Attribute::name).collect(Collectors.toSet()), hasItems("emp_no", "x", "y", "z", "_fork")); + for (LogicalPlan branch : fork.children()) { + var branchProject = as(branch, Project.class); + assertThat(branchProject.projections().size(), equalTo(5)); + assertThat(Expressions.names(branchProject.projections()), containsInAnyOrder("emp_no", "x", "y", "z", "_fork")); + var branchEval = as(branchProject.child(), Eval.class); + var forkAlias = as(branchEval.fields().getFirst(), Alias.class); + assertThat(forkAlias.name(), equalTo("_fork")); + var grok = as(branchEval.child(), Grok.class); + assertThat(grok.extractedFields().size(), equalTo(3)); + assertThat(Expressions.names(grok.extractedFields()), containsInAnyOrder("x", "y", "z")); + var evalInBranch = as(grok.child(), Eval.class); + var aliasInBranch = as(evalInBranch.fields().getFirst(), Alias.class); + // The EVAL that created 'a' or 'b' should still be present in the branch even if not part of the output + assertThat(aliasInBranch.name(), anyOf(equalTo("a"), equalTo("b"))); + var limitInBranch = as(evalInBranch.child(), Limit.class); + var filter = as(limitInBranch.child(), Filter.class); + assertThat(filter.toString(), containsString("IN(10048[INTEGER],10081[INTEGER],emp_no")); + var relation = as(filter.child(), EsRelation.class); + assertThat( + relation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name", "last_name") + ); + } + } + + /** + * Limit[10000[INTEGER],false,false] + * \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN],PT0S[TIME_DURATION]) AS count(*)#36, COUNT(emp_no{r}#109,true[BOOLEAN],PT0S[ + * TIME_DURATION]) AS d#39, MAX(_fork{r}#124,true[BOOLEAN],PT0S[TIME_DURATION]) AS m#42, COUNT(s{r}#119,true[BOOLEAN],PT0S[TIME_DURATION]) AS ls#45]] + * \_Fork[[emp_no{r}#109, s{r}#119, _fork{r}#124]] + * |_Project[[emp_no{f}#46, s{r}#6, _fork{r}#7]] + * | \_Eval[[fork1[KEYWORD] AS _fork#7]] + * | \_Dissect[a{r}#21,Parser[pattern=%{x} %{y} %{z}, appendSeparator=, parser=org.elasticsearch.dissect.DissectParser@53c057b + * 1],[x{r}#22, y{r}#23, z{r}#24]] + * | \_Eval[[10[INTEGER] AS s#6, CONCAT(first_name{f}#47, [KEYWORD],TOSTRING(emp_no{f}#46), [KEYWORD],last_name{f}#50) AS + * a#21]] + * | \_Limit[1000[INTEGER],false,false] + * | \_Filter[IN(10048[INTEGER],10081[INTEGER],emp_no{f}#46)] + * | \_EsRelation[employees][_meta_field{f}#52, emp_no{f}#46, first_name{f}#47, ..] + * |_Project[[emp_no{r}#91, s{r}#101, _fork{r}#7]] + * | \_Eval[[fork2[KEYWORD] AS _fork#7, null[INTEGER] AS emp_no#91, null[INTEGER] AS s#101]] + * | \_Limit[1000[INTEGER],false,false] + * | \_LocalRelation[[$$COUNT$COUNT(*)::keywo>$0{r$}#125],Page{blocks=[ConstantNullBlock[positions=1]]}] + * |_Project[[emp_no{f}#68, s{r}#6, _fork{r}#7]] + * | \_Eval[[fork3[KEYWORD] AS _fork#7]] + * | \_TopN[[Order[emp_no{f}#68,ASC,LAST]],2[INTEGER],false] + * | \_Eval[[10[INTEGER] AS s#6]] + * | \_Filter[IN(10048[INTEGER],10081[INTEGER],emp_no{f}#68)] + * | \_EsRelation[employees][_meta_field{f}#74, emp_no{f}#68, first_name{f}#69, ..] + * \_Project[[emp_no{f}#79, s{r}#6, _fork{r}#7]] + * \_Eval[[10[INTEGER] AS s#6, fork4[KEYWORD] AS _fork#7]] + * \_Limit[1000[INTEGER],false,false] + * \_Filter[IN(10048[INTEGER],10081[INTEGER],emp_no{f}#79)] + * \_EsRelation[employees][_meta_field{f}#85, emp_no{f}#79, first_name{f}#80, ..] + */ + public void testPruneColumnsInForkBranchesPruneIfAggregation() { + var query = """ + FROM employees + | WHERE emp_no == 10048 OR emp_no == 10081 + | EVAL s = 10 + | FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name) + | DISSECT a "%{x} %{y} %{z}" + | EVAL y = y::keyword ) + ( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword ) + ( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name ) + ( EVAL x = "abc" | EVAL y = "aaa" ) + | STATS count(*), d = count(emp_no), m = max(_fork), ls = count(s) + """; + var plan = optimizedPlan(query); + var limit = as(plan, Limit.class); + var aggregate = as(limit.child(), Aggregate.class); + assertThat(aggregate.aggregates().size(), equalTo(4)); + assertThat(Expressions.names(aggregate.aggregates()), containsInAnyOrder("count(*)", "d", "m", "ls")); + var fork = as(aggregate.child(), Fork.class); + assertThat(fork.output().size(), equalTo(3)); + + var firstBranch = fork.children().getFirst(); + var firstBranchProject = as(firstBranch, Project.class); + assertThat(firstBranchProject.projections().size(), equalTo(3)); + var evalInFirstBranch = as(firstBranchProject.child(), Eval.class); + var dissect = as(evalInFirstBranch.child(), Dissect.class); + assertThat(Expressions.names(dissect.extractedFields()), containsInAnyOrder("x", "y", "z")); + var evalAfterDissect = as(dissect.child(), Eval.class); + assertThat(Expressions.names(evalAfterDissect.fields()), containsInAnyOrder("a", "s")); + var limitInFirstBranch = as(evalAfterDissect.child(), Limit.class); + var filterInFirstBranch = as(limitInFirstBranch.child(), Filter.class); + var firstBranchRelation = as(filterInFirstBranch.child(), EsRelation.class); + assertThat( + firstBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name", "last_name") + ); + + var secondBranch = fork.children().get(1); + var secondBranchProject = as(secondBranch, Project.class); + assertThat(secondBranchProject.projections().size(), equalTo(3)); + var evalInSecondBranch = as(secondBranchProject.child(), Eval.class); + var limitInSecondBranch = as(evalInSecondBranch.child(), Limit.class); + var localRelation = as(limitInSecondBranch.child(), LocalRelation.class); + assertThat( + localRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("$$COUNT$COUNT(*)::keywo>$0") + ); + + var thirdBranch = fork.children().get(2); + var thirdBranchProject = as(thirdBranch, Project.class); + assertThat(thirdBranchProject.projections().size(), equalTo(3)); + assertThat(Expressions.names(thirdBranchProject.projections()), containsInAnyOrder("emp_no", "s", "_fork")); + var evalInThirdBranch = as(thirdBranchProject.child(), Eval.class); + var topNInThirdBranch = as(evalInThirdBranch.child(), TopN.class); + var evalBeforeTopN = as(topNInThirdBranch.child(), Eval.class); + var filterInThirdBranch = as(evalBeforeTopN.child(), Filter.class); + var thirdBranchRelation = as(filterInThirdBranch.child(), EsRelation.class); + assertThat( + thirdBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name", "last_name") + ); + + var fourthBranch = fork.children().get(3); + var fourthBranchProject = as(fourthBranch, Project.class); + assertThat(fourthBranchProject.projections().size(), equalTo(3)); + assertThat(Expressions.names(fourthBranchProject.projections()), containsInAnyOrder("emp_no", "s", "_fork")); + var evalInFourthBranch = as(fourthBranchProject.child(), Eval.class); + var limitInFourthBranch = as(evalInFourthBranch.child(), Limit.class); + var filterInFourthBranch = as(limitInFourthBranch.child(), Filter.class); + var fourthBranchRelation = as(filterInFourthBranch.child(), EsRelation.class); + assertThat( + fourthBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name", "last_name") + ); + } + + /** + * EsqlProject[[languages{r}#33]] + * \_Limit[10000[INTEGER],false,false] + * \_Filter[_fork{r}#34 == fork1[KEYWORD]] + * \_Fork[[languages{r}#33, _fork{r}#34]] + * |_Project[[languages{r}#6, _fork{r}#8]] + * | \_Eval[[123[INTEGER] AS languages#6, fork1[KEYWORD] AS _fork#8]] + * | \_Limit[4[INTEGER],false,false] + * | \_EsRelation[employees][_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, ..] + * \_Project[[languages{r}#6, _fork{r}#8]] + * \_Eval[[123[INTEGER] AS languages#6, fork2[KEYWORD] AS _fork#8]] + * \_Limit[4[INTEGER],false,false] + * \_EsRelation[employees][_meta_field{f}#28, emp_no{f}#22, first_name{f}#23, ..] + */ + public void testPruneColumnsInForkBranchesShouldRespectOuterAlias() { + var query = """ + from employees metadata _index + | drop languages + | eval languages = 123 + | keep languages + | limit 4 + | fork + (where true) + (where true) + | where _fork == "fork1" + | drop _fork + """; + var plan = optimizedPlan(query); + var project = as(plan, EsqlProject.class); + assertThat(project.projections().size(), equalTo(1)); + assertThat(Expressions.names(project.projections()), contains("languages")); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var fork = as(filter.child(), Fork.class); + assertThat(fork.output(), hasSize(2)); + assertThat(fork.output().stream().map(Attribute::name).collect(Collectors.toSet()), hasItems("languages", "_fork")); + for (LogicalPlan branch : fork.children()) { + var branchProject = as(branch, Project.class); + assertThat(branchProject.projections().size(), equalTo(2)); + assertThat(Expressions.names(branchProject.projections()), containsInAnyOrder("languages", "_fork")); + var branchEval = as(branchProject.child(), Eval.class); + var fields = branchEval.fields(); + assertThat(fields.size(), equalTo(2)); + var languagesAlias = as(fields.get(0), Alias.class); + assertThat(languagesAlias.name(), equalTo("languages")); + var forkAlias = as(fields.get(1), Alias.class); + assertThat(forkAlias.name(), equalTo("_fork")); + var limitInBranch = as(branchEval.child(), Limit.class); + var relation = as(limitInBranch.child(), EsRelation.class); + assertThat(relation.output().stream().map(Attribute::name).collect(Collectors.toSet()), hasItems("emp_no", "first_name")); + } + } + + /** + * EsqlProject[[languages{r}#55]] + * \_Limit[1000[INTEGER],false,false] + * \_Filter[_fork{r}#57 == fork1[KEYWORD]] + * \_Fork[[languages{r}#55, _fork{r}#57]] + * |_Project[[x{r}#15 AS languages#9, _fork{r}#13]] + * | \_Eval[[1[INTEGER] AS x#15, fork1[KEYWORD] AS _fork#13]] + * | \_Limit[1000[INTEGER],false,false] + * | \_EsRelation[employees][_meta_field{f}#27, emp_no{f}#21, first_name{f}#22, ..] + * \_Project[[languages{r}#43, _fork{r}#13]] + * \_Eval[[fork2[KEYWORD] AS _fork#13, null[INTEGER] AS languages#43]] + * \_Limit[1000[INTEGER],false,false] + * \_EsRelation[employees][_meta_field{f}#38, emp_no{f}#32, first_name{f}#33, ..] + */ + public void testPruneColumnsInForkBranchesShouldKeepAliasWithSameNameAsColumn() { + var query = """ + from employees + | drop languages + | fork + (eval x = 1 | rename x as lang | rename lang as languages | eval a = "aardvark" | rename a as foo) + (where true) + | where _fork == "fork1" + | drop _fork + | keep languages + """; + var plan = optimizedPlan(query); + var project = as(plan, EsqlProject.class); + assertThat(project.projections().size(), equalTo(1)); + assertThat(Expressions.names(project.projections()), contains("languages")); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var fork = as(filter.child(), Fork.class); + assertThat(fork.output(), hasSize(2)); + assertThat(fork.children(), hasSize(2)); + assertThat(fork.output().stream().map(Attribute::name).collect(Collectors.toSet()), hasItems("languages", "_fork")); + + var firstBranch = fork.children().getFirst(); + var firstBranchProject = as(firstBranch, Project.class); + assertThat(firstBranchProject.projections().size(), equalTo(2)); + assertThat(Expressions.names(firstBranchProject.projections()), containsInAnyOrder("languages", "_fork")); + var firstBranchEval = as(firstBranchProject.child(), Eval.class); + var firstBranchFields = firstBranchEval.fields(); + assertThat(firstBranchFields.size(), equalTo(2)); + assertThat(Expressions.names(firstBranchFields), containsInAnyOrder("x", "_fork")); + var limitInFirstBranch = as(firstBranchEval.child(), Limit.class); + var firstBranchRelation = as(limitInFirstBranch.child(), EsRelation.class); + assertThat( + firstBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name") + ); + + var secondBranch = fork.children().get(1); + var secondBranchProject = as(secondBranch, Project.class); + assertThat(secondBranchProject.projections().size(), equalTo(2)); + assertThat(Expressions.names(secondBranchProject.projections()), containsInAnyOrder("languages", "_fork")); + var secondBranchEval = as(secondBranchProject.child(), Eval.class); + var secondBranchFields = secondBranchEval.fields(); + assertThat(secondBranchFields.size(), equalTo(2)); + assertThat(Expressions.names(secondBranchFields), containsInAnyOrder("languages", "_fork")); + var limitInSecondBranch = as(secondBranchEval.child(), Limit.class); + var secondBranchRelation = as(limitInSecondBranch.child(), EsRelation.class); + assertThat( + secondBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name") + ); + } + + /** + * EsqlProject[[_meta_field{r}#48, emp_no{r}#49, first_name{r}#50, gender{r}#51, hire_date{r}#52, job{r}#53, job.raw{r}#54, l + * ast_name{r}#55, long_noidx{r}#56, salary{r}#57]] + * \_Limit[1000[INTEGER],false,false] + * \_Filter[_fork{r}#59 == fork1[KEYWORD]] + * \_Fork[[_meta_field{r}#48, emp_no{r}#49, first_name{r}#50, gender{r}#51, hire_date{r}#52, job{r}#53, job.raw{r}#54, l + * ast_name{r}#55, long_noidx{r}#56, salary{r}#57, _fork{r}#59]] + * |_Project[[_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, gender{f}#17, hire_date{f}#22, job{f}#23, job.raw{f}#24, l + * ast_name{f}#19, long_noidx{f}#25, salary{f}#20, _fork{r}#9]] + * | \_Eval[[fork1[KEYWORD] AS _fork#9]] + * | \_Limit[1000[INTEGER],false,false] + * | \_EsRelation[employees][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..] + * |_LocalRelation[[_meta_field{f}#32, emp_no{f}#26, first_name{f}#27, gender{f}#28, hire_date{f}#33, job{f}#34, job.raw{f}#35, l + * ast_name{f}#30, long_noidx{f}#36, salary{f}#31, _fork{r}#9],EMPTY] + * \_Project[[_meta_field{f}#43, emp_no{f}#37, first_name{f}#38, gender{f}#39, hire_date{f}#44, job{f}#45, job.raw{f}#46, l + * ast_name{f}#41, long_noidx{f}#47, salary{f}#42, _fork{r}#9]] + * \_Eval[[fork3[KEYWORD] AS _fork#9]] + * \_Limit[1000[INTEGER],false,false] + * \_EsRelation[employees][_meta_field{f}#43, emp_no{f}#37, first_name{f}#38, ..] + */ + public void testPruneColumnsInForkBranchesShouldPruneNestedEvalsIfColumnIsDropped() { + var query = """ + from employees + | fork + (eval x = 1 | rename x as lang | rename lang as languages) + (where false) + (where true) + | where _fork == "fork1" + | drop _fork + | drop languages + """; + var plan = plan(query); + var project = as(plan, EsqlProject.class); + assertThat(project.projections().size(), equalTo(10)); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var fork = as(filter.child(), Fork.class); + assertThat(fork.output().size(), equalTo(11)); + + var firstBranch = fork.children().getFirst(); + var firstBranchProject = as(firstBranch, Project.class); + assertThat(firstBranchProject.projections().size(), equalTo(11)); + var firstBranchEval = as(firstBranchProject.child(), Eval.class); + var forkAliasInFirstBranch = as(firstBranchEval.fields().getFirst(), Alias.class); + assertThat(forkAliasInFirstBranch.name(), equalTo("_fork")); + var limitInFirstBranch = as(firstBranchEval.child(), Limit.class); + var firstBranchRelation = as(limitInFirstBranch.child(), EsRelation.class); + assertThat( + firstBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name", "gender", "hire_date", "job", "job.raw", "last_name", "long_noidx", "salary") + ); + + var secondBranch = fork.children().get(1); + var secondBranchLocalRelation = as(secondBranch, LocalRelation.class); + assertThat( + secondBranchLocalRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems( + "_meta_field", + "emp_no", + "first_name", + "gender", + "hire_date", + "job", + "job.raw", + "last_name", + "long_noidx", + "salary", + "_fork" + ) + ); + assertThat(secondBranchLocalRelation.supplier(), instanceOf(EmptyLocalSupplier.class)); + + var thirdBranch = fork.children().get(2); + var thirdBranchProject = as(thirdBranch, Project.class); + assertThat(thirdBranchProject.projections().size(), equalTo(11)); + var thirdBranchEval = as(thirdBranchProject.child(), Eval.class); + var forkAliasInThirdBranch = as(thirdBranchEval.fields().getFirst(), Alias.class); + assertThat(forkAliasInThirdBranch.name(), equalTo("_fork")); + var limitInThirdBranch = as(thirdBranchEval.child(), Limit.class); + var thirdBranchRelation = as(limitInThirdBranch.child(), EsRelation.class); + assertThat( + thirdBranchRelation.output().stream().map(Attribute::name).collect(Collectors.toSet()), + hasItems("emp_no", "first_name", "gender", "hire_date", "job", "job.raw", "last_name", "long_noidx", "salary") + ); + } } From 9823defa434baf9d1b168461b719f1da45cadf11 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Wed, 26 Nov 2025 11:38:44 +0200 Subject: [PATCH 2/4] checkstyle --- .../xpack/esql/optimizer/LogicalPlanOptimizerTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index b57d3caecdac2..0c3204f9c1e26 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -9648,7 +9648,8 @@ public void testPruneColumnsInForkBranchesDropNestedEvalsFromOutputIfNotNeeded() /** * Limit[10000[INTEGER],false,false] * \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN],PT0S[TIME_DURATION]) AS count(*)#36, COUNT(emp_no{r}#109,true[BOOLEAN],PT0S[ - * TIME_DURATION]) AS d#39, MAX(_fork{r}#124,true[BOOLEAN],PT0S[TIME_DURATION]) AS m#42, COUNT(s{r}#119,true[BOOLEAN],PT0S[TIME_DURATION]) AS ls#45]] + * TIME_DURATION]) AS d#39,MAX(_fork{r}#124,true[BOOLEAN],PT0S[TIME_DURATION]) AS m#42, + * COUNT(s{r}#119,true[BOOLEAN],PT0S[TIME_DURATION]) AS ls#45]] * \_Fork[[emp_no{r}#109, s{r}#119, _fork{r}#124]] * |_Project[[emp_no{f}#46, s{r}#6, _fork{r}#7]] * | \_Eval[[fork1[KEYWORD] AS _fork#7]] From fcb52f28a19c50303d9da70e998d028bf196385b Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Thu, 27 Nov 2025 13:30:58 +0200 Subject: [PATCH 3/4] addressing PR comments - removing PruneColumnsInForkBranches and adding functionality in PruneColumns --- .../esql/optimizer/LogicalPlanOptimizer.java | 2 - .../optimizer/rules/logical/PruneColumns.java | 88 +++++++++--- .../logical/PruneColumnsInForkBranches.java | 127 ------------------ .../optimizer/LogicalPlanOptimizerTests.java | 1 + 4 files changed, 72 insertions(+), 146 deletions(-) delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index d57546d055ff8..a532cebe455e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -33,7 +33,6 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateNullable; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropgateUnmappedFields; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns; -import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumnsInForkBranches; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyAggregates; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneFilters; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneLiteralsInOrderBy; @@ -210,7 +209,6 @@ protected static Batch operators() { new PruneFilters(), new PruneColumns(), new PruneLiteralsInOrderBy(), - new PruneColumnsInForkBranches(), new PushDownAndCombineLimits(), new PushLimitToKnn(), new PushDownAndCombineFilters(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java index 9b72114a560e9..2411370655038 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java @@ -36,6 +36,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans.skipPlan; @@ -49,7 +50,7 @@ public LogicalPlan apply(LogicalPlan plan) { return pruneColumns(plan, plan.outputSet().asBuilder(), false); } - static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boolean inlineJoin) { + private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boolean inlineJoin) { Holder forkPresent = new Holder<>(false); // while going top-to-bottom (upstream) return plan.transformDown(p -> { @@ -96,7 +97,7 @@ static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boo }); } - static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Builder used, boolean inlineJoin) { + private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Builder used, boolean inlineJoin) { LogicalPlan p = aggregate; var remaining = pruneUnusedAndAddReferences(aggregate.aggregates(), used); @@ -136,7 +137,7 @@ static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Bui return p; } - static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder recheck) { + private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder recheck) { LogicalPlan p = ij; used.addAll(ij.references()); @@ -157,7 +158,7 @@ static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Bui return p; } - static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Holder recheck) { + private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Holder recheck) { LogicalPlan p = eval; var remaining = pruneUnusedAndAddReferences(eval.fields(), used); @@ -175,7 +176,7 @@ static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Hold } // Note: only run when the Project is a descendent of an InlineJoin. - static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) { + private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) { LogicalPlan p = project; var remaining = pruneUnusedAndAddReferences(project.projections(), used); @@ -186,7 +187,7 @@ static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder u return p; } - static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder used) { + private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder used) { LogicalPlan p = esr; if (esr.indexMode() == IndexMode.LOOKUP) { @@ -204,15 +205,11 @@ static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder used) { // prune the output attributes of fork based on usage from the rest of the plan - // this does not consider the inner usage within each branch of the fork - // as those will be handled when traversing down each branch in PruneColumnsInForkBranches - LogicalPlan p = fork; - // should exit early for UnionAll if (fork instanceof UnionAll) { - return p; + return fork; } - boolean changed = false; + boolean forkOutputChanged = false; AttributeSet.Builder builder = AttributeSet.builder(); // if any of the fork outputs are used, keep them // otherwise, prune them based on the rest of the plan's usage @@ -222,14 +219,71 @@ private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder us if (attr.synthetic() || names.contains(attr.name())) { builder.add(attr); } else { - changed = true; + forkOutputChanged = true; } } - if (changed) { - List attrs = builder.build().stream().toList(); - p = new Fork(fork.source(), fork.children(), attrs); + + var pruneForkAttrs = forkOutputChanged ? builder.build().stream().toList() : fork.output(); + // now that we have the pruned fork output attributes, we can proceed to apply pruning all children plan + var usedFork = AttributeSet.forkBuilder(); + usedFork.addAll(pruneForkAttrs); + var forkOutputNames = pruneForkAttrs.stream().map(NamedExpression::name).collect(Collectors.toSet()); + boolean childrenChanged = false; + List newChildren = new ArrayList<>(); + for (var child : fork.children()) { + var clonedUsed = AttributeSet.forkBuilder().addAll(usedFork); + var newChild = pruneSubPlan(child, clonedUsed, forkOutputNames); + newChildren.add(newChild); + if (false == newChild.equals(child)) { + childrenChanged = true; + } } - return p; + if (childrenChanged) { + return new Fork(fork.source(), newChildren, pruneForkAttrs); + } else if (forkOutputChanged) { + return new Fork(fork.source(), fork.children(), pruneForkAttrs); + } + return fork; + } + + private static LogicalPlan pruneSubPlan(LogicalPlan plan, AttributeSet.Builder usedAttrs, Set forkOutput) { + if (plan instanceof LocalRelation localRelation) { + var outputAttrs = localRelation.output().stream().filter(usedAttrs::contains).collect(Collectors.toList()); + return new LocalRelation(localRelation.source(), outputAttrs, localRelation.supplier()); + } + + var projectHolder = new Holder<>(false); + return plan.transformDown(p -> { + if (p instanceof Limit || p instanceof Sample) { + return p; + } + + var recheck = new Holder(); + do { + // we operate using the names of the fields, rather than comparing the attributes directly, + // as attributes may have been recreated during the transformations of fork branches. + recheck.set(false); + p = switch (p) { + case Aggregate agg -> pruneColumnsInAggregate(agg, usedAttrs, false); + case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, usedAttrs, recheck); + case Eval eval -> pruneColumnsInEval(eval, usedAttrs, recheck); + case Project project -> { + // process only the direct Project after Fork, but skip any subsequent instances + if (projectHolder.get()) { + yield p; + } else { + projectHolder.set(true); + var prunedAttrs = project.projections().stream().filter(x -> forkOutput.contains(x.name())).toList(); + yield new Project(project.source(), project.child(), prunedAttrs); + } + } + case EsRelation esr -> pruneColumnsInEsRelation(esr, usedAttrs); + default -> p; + }; + } while (recheck.get()); + usedAttrs.addAll(p.references()); + return p; + }); } private static LogicalPlan emptyLocalRelation(UnaryPlan plan) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java deleted file mode 100644 index b7d2df362ddc8..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumnsInForkBranches.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.optimizer.rules.logical; - -import org.elasticsearch.xpack.esql.core.expression.AttributeSet; -import org.elasticsearch.xpack.esql.core.expression.NamedExpression; -import org.elasticsearch.xpack.esql.core.util.Holder; -import org.elasticsearch.xpack.esql.plan.logical.Aggregate; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; -import org.elasticsearch.xpack.esql.plan.logical.Eval; -import org.elasticsearch.xpack.esql.plan.logical.Fork; -import org.elasticsearch.xpack.esql.plan.logical.Limit; -import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.plan.logical.Project; -import org.elasticsearch.xpack.esql.plan.logical.Sample; -import org.elasticsearch.xpack.esql.plan.logical.UnionAll; -import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.rule.Rule; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInAggregate; -import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInEsRelation; -import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInEval; -import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns.pruneColumnsInInlineJoinRight; - -/** - * This is used to prune unused columns and expressions in each branch of a Fork. - * The output for each fork branch has already been pruned in {@code PruneColumns#pruneColumnsInFork}, so here we only need to - * remove unused columns and expressions in the sub-plans of each branch, similarly to independently running {@code PruneColumns}. - */ -public final class PruneColumnsInForkBranches extends Rule { - - @Override - public LogicalPlan apply(LogicalPlan plan) { - - // collect used attributes from the plan above fork - var used = AttributeSet.forkBuilder(); - var forkFound = new Holder<>(false); - - // traverse down to the fork node - return plan.transformDown(p -> { - // if fork is not found yet, keep collecting used attributes from everything above. - // Once fork is found, just return the rest of the plan as is, as any pruning/transformation will have - // taken place in pruneSubPlan for each of the fork branches. - if (false == (p instanceof Fork) || forkFound.get()) { - if (false == forkFound.get()) { - used.addAll(p.references()); - } - return p; - } - - // only do this for fork - if (p instanceof UnionAll) { - return p; - } - - used.addAll(p.output()); - forkFound.set(true); - var forkOutputNames = p.output().stream().map(NamedExpression::name).collect(Collectors.toSet()); - boolean changed = false; - List newChildren = new ArrayList<>(); - for (var child : p.children()) { - var clonedUsed = AttributeSet.forkBuilder().addAll(used); - var newChild = pruneSubPlan(child, clonedUsed, forkOutputNames); - newChildren.add(newChild); - if (false == newChild.equals(child)) { - changed = true; - } - } - if (changed) { - return new Fork(p.source(), newChildren, p.output()); - } else { - return p; - } - }); - } - - private static LogicalPlan pruneSubPlan(LogicalPlan plan, AttributeSet.Builder usedAttrs, Set forkOutput) { - if (plan instanceof LocalRelation localRelation) { - var outputAttrs = localRelation.output().stream().filter(usedAttrs::contains).collect(Collectors.toList()); - return new LocalRelation(localRelation.source(), outputAttrs, localRelation.supplier()); - } - - var projectHolder = new Holder<>(false); - return plan.transformDown(p -> { - if (p instanceof Limit || p instanceof Sample) { - return p; - } - - var recheck = new Holder(); - do { - // we operate using the names of the fields, rather than comparing the attributes directly, - // as attributes may have been recreated during the transformations of fork branches. - recheck.set(false); - p = switch (p) { - case Aggregate agg -> pruneColumnsInAggregate(agg, usedAttrs, false); - case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, usedAttrs, recheck); - case Eval eval -> pruneColumnsInEval(eval, usedAttrs, recheck); - case Project project -> { - // process only the direct Project after Fork, but skip any subsequent instances - if (projectHolder.get()) { - yield p; - } else { - projectHolder.set(true); - var prunedAttrs = project.projections().stream().filter(x -> forkOutput.contains(x.name())).toList(); - yield new Project(project.source(), project.child(), prunedAttrs); - } - } - case EsRelation esr -> pruneColumnsInEsRelation(esr, usedAttrs); - default -> p; - }; - } while (recheck.get()); - usedAttrs.addAll(p.references()); - return p; - }); - } -} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index ac440d1b88016..787f051a843de 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -9831,6 +9831,7 @@ public void testPruneColumnsInForkBranchesPruneIfAggregation() { assertThat(Expressions.names(aggregate.aggregates()), containsInAnyOrder("count(*)", "d", "m", "ls")); var fork = as(aggregate.child(), Fork.class); assertThat(fork.output().size(), equalTo(3)); + assertThat(fork.output().stream().map(Attribute::name).collect(Collectors.toSet()), containsInAnyOrder("emp_no", "s", "_fork")); var firstBranch = fork.children().getFirst(); var firstBranchProject = as(firstBranch, Project.class); From 01b69b1348e0a24cbfb3d5ee4afc99fff40561a2 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Thu, 27 Nov 2025 13:45:18 +0200 Subject: [PATCH 4/4] iter --- .../esql/optimizer/rules/logical/PruneColumns.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java index 2411370655038..195db9aad8484 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java @@ -223,11 +223,11 @@ private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder us } } - var pruneForkAttrs = forkOutputChanged ? builder.build().stream().toList() : fork.output(); + var prunedForkAttrs = forkOutputChanged ? builder.build().stream().toList() : fork.output(); // now that we have the pruned fork output attributes, we can proceed to apply pruning all children plan var usedFork = AttributeSet.forkBuilder(); - usedFork.addAll(pruneForkAttrs); - var forkOutputNames = pruneForkAttrs.stream().map(NamedExpression::name).collect(Collectors.toSet()); + usedFork.addAll(prunedForkAttrs); + var forkOutputNames = prunedForkAttrs.stream().map(NamedExpression::name).collect(Collectors.toSet()); boolean childrenChanged = false; List newChildren = new ArrayList<>(); for (var child : fork.children()) { @@ -239,9 +239,9 @@ private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder us } } if (childrenChanged) { - return new Fork(fork.source(), newChildren, pruneForkAttrs); + return new Fork(fork.source(), newChildren, prunedForkAttrs); } else if (forkOutputChanged) { - return new Fork(fork.source(), fork.children(), pruneForkAttrs); + return new Fork(fork.source(), fork.children(), prunedForkAttrs); } return fork; }