From 6ea61d66068517d0c24c3d19435506eb95cce661 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Wed, 21 May 2025 17:09:27 +0200 Subject: [PATCH 1/3] Fix constant folding when using FORK --- .../src/main/resources/fork.csv-spec | 19 ++++++++ .../xpack/esql/action/ForkIT.java | 45 +++++++++++++++++++ .../xpack/esql/analysis/Analyzer.java | 19 ++++++-- .../rules/PlanConsistencyChecker.java | 45 +++++++++++++++++++ .../xpack/esql/parser/LogicalPlanBuilder.java | 2 +- .../xpack/esql/plan/logical/Fork.java | 23 +++++----- .../plan/logical/CommandLicenseTests.java | 2 +- 7 files changed, 138 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec index 5a4865517064a..5480dda8aecbe 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec @@ -195,3 +195,22 @@ fork3 | 10081 | Rosen | null | null | null fork4 | 10048 | abc | aaa | null | null fork4 | 10081 | abc | aaa | null | null ; + +forkWithFiltersOnConstantValues +FROM employees +| EVAL z = 1 +| WHERE z == 1 +| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | WHERE z - 1 == 0) + (WHERE emp_no == 10081 OR emp_no == 10087 | EVAL a = "x" ) + (STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no) | EVAL a = "y" ) + (STATS x = COUNT(*), y = MIN(emp_no)) +| WHERE _fork == "fork2" OR a == "y" +| KEEP _fork, emp_no, x, y, z +| SORT _fork, emp_no +; + +_fork:keyword | emp_no:integer | x:long | y:integer | z:integer +fork2 | 10081 | null | null | 1 +fork2 | 10087 | null | null | 1 +fork3 | null | 100 | 10100 | 10001 +; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java index 152119ea5a7ce..199cb091f3475 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java @@ -585,6 +585,51 @@ public void testWithStatsSimple() { } } + public void testWithConditionOnForkField() { + var query = """ + FROM test + | FORK ( WHERE content:"fox" | EVAL a = 1) + ( WHERE content:"cat" | EVAL b = 2 ) + ( WHERE content:"dog" | EVAL c = 3 ) + | WHERE _fork == "fork2" + | KEEP _fork, id, content, a, b, c + | SORT _fork + """; + + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a", "b", "c")); + + Iterable> expectedValues = List.of( + Arrays.stream(new Object[] { "fork2", 5, "There is also a white cat", null, 2, null }).toList() + ); + assertValues(resp.values(), expectedValues); + } + } + + public void testWithFilteringOnConstantColumn() { + var query = """ + FROM test + | FORK ( WHERE content:"fox" | EVAL a = 1) + ( WHERE content:"cat" | EVAL a = 2 ) + ( WHERE content:"dog" | EVAL a = 3 ) + | WHERE a == 3 + | KEEP _fork, id, content, a + | SORT id + | LIMIT 3 + """; + + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a")); + + Iterable> expectedValues = List.of( + List.of("fork3", 2, "This is a brown dog", 3), + List.of("fork3", 3, "This dog is really brown", 3), + List.of("fork3", 4, "The dog is brown but this document is very very long", 3) + ); + assertValues(resp.values(), expectedValues); + } + } + public void testWithEvalWithConflictingTypes() { var query = """ FROM test diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 3e2ffa706b441..775b441c4f000 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -752,7 +752,8 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { // we align the outputs of the sub plans such that they have the same columns boolean changed = false; List newSubPlans = new ArrayList<>(); - Set forkColumns = fork.outputSet().names(); + List outputUnion = Fork.outputUnion(fork.children()); + Set forkColumns = outputUnion.stream().map(Attribute::name).collect(Collectors.toSet()); for (LogicalPlan logicalPlan : fork.children()) { Source source = logicalPlan.source(); @@ -760,7 +761,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { // find the missing columns List missing = new ArrayList<>(); Set currentNames = logicalPlan.outputSet().names(); - for (Attribute attr : fork.outputSet()) { + for (Attribute attr : outputUnion) { if (currentNames.contains(attr.name()) == false) { missing.add(attr); } @@ -795,7 +796,19 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { newSubPlans.add(logicalPlan); } - return changed ? new Fork(fork.source(), newSubPlans) : fork; + if (!changed) { + return fork; + } + + List newOutput = new ArrayList<>(); + + // We don't want to keep the same attributes that are outputted by the FORK branches. + // Keeping the same attributes can have unintended side effects when applying optimizations like constant folding. + for (Attribute attr : Fork.outputUnion(newSubPlans)) { + newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType())); + } + + return changed ? new Fork(fork.source(), newSubPlans, newOutput) : fork; } private LogicalPlan resolveRerank(Rerank rerank, List childrenOutput) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java index c97679e14f42d..07809543ca3b1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java @@ -12,9 +12,17 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.NameId; +import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.plan.QueryPlan; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.physical.BinaryExec; +import org.elasticsearch.xpack.esql.plan.physical.MergeExec; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.common.Failure.fail; @@ -44,6 +52,8 @@ public static void checkPlan(QueryPlan p, Failures failures) { binaryExec.right().outputSet(), failures ); + } else if (p instanceof Fork || p instanceof MergeExec) { + checkMissingFork(p, failures); } else { checkMissing(p, p.references(), p.inputSet(), "missing references", failures); } @@ -59,6 +69,41 @@ public static void checkPlan(QueryPlan p, Failures failures) { } } + private static void checkMissingFork(QueryPlan plan, Failures failures) { + for (QueryPlan child : plan.children()) { + checkMissingForkBranch(child, plan.outputSet(), failures); + } + } + + private static void checkMissingForkBranch(QueryPlan plan, AttributeSet forkOutputSet, Failures failures) { + Map attributeTypes = forkOutputSet.stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType)); + AttributeSet missing = AttributeSet.of(); + + Set commonAttrs = new HashSet<>(); + + // get the missing attributes from the sub plan + plan.output().forEach(attribute -> { + var attrType = attributeTypes.get(attribute.name()); + if (attrType == null || attrType != attribute.dataType()) { + missing.add(attribute); + } + commonAttrs.add(attribute.name()); + }); + + // get the missing attributes from the fork output + forkOutputSet.forEach(attribute -> { + if (commonAttrs.contains(attribute.name()) == false) { + missing.add(attribute); + } + }); + + if (missing.isEmpty() == false) { + failures.add( + fail(plan, "Plan [{}] optimized incorrectly due to missing attributes in subplans", plan.nodeString(), missing.toString()) + ); + } + } + private static void checkMissingBinary( QueryPlan plan, AttributeSet leftReferences, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 1a2125a00f8d5..944befd856771 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -637,7 +637,7 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) { } return input -> { List subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList(); - return new Fork(source(ctx), subPlans); + return new Fork(source(ctx), subPlans, List.of()); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java index 8dec92d762b4b..f4126a1ded2ea 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java @@ -35,18 +35,19 @@ public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware { public static final String FORK_FIELD = "_fork"; - List lazyOutput; + private final List output; - public Fork(Source source, List children) { + public Fork(Source source, List children, List output) { super(source, children); if (children.size() < 2) { throw new IllegalArgumentException("requires more than two subqueries, got:" + children.size()); } + this.output = output; } @Override public LogicalPlan replaceChildren(List newChildren) { - return new Fork(source(), newChildren); + return new Fork(source(), newChildren, output); } @Override @@ -65,7 +66,8 @@ public boolean expressionsResolved() { return false; } - if (children().stream().anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME))) { + if (children().stream() + .anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME) || output.size() != p.output().size())) { return false; } @@ -85,26 +87,23 @@ public boolean expressionsResolved() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Fork::new, children()); + return NodeInfo.create(this, Fork::new, children(), output); } public Fork replaceSubPlans(List subPlans) { - return new Fork(source(), subPlans); + return new Fork(source(), subPlans, output); } @Override public List output() { - if (lazyOutput == null) { - lazyOutput = lazyOutput(); - } - return lazyOutput; + return output; } - private List lazyOutput() { + public static List outputUnion(List subplans) { List output = new ArrayList<>(); Set names = new HashSet<>(); - for (var subPlan : children()) { + for (var subPlan : subplans) { for (var attr : subPlan.output()) { if (names.contains(attr.name()) == false && attr.name().equals(Analyzer.NO_FIELDS_NAME) == false) { names.add(attr.name()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java index 80bfbf5b6436f..ab4e2f0b3e0d2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java @@ -156,7 +156,7 @@ private static LogicalPlan createInstance(Class clazz, Lo return new Grok(source, child, null, null, List.of()); } case "Fork" -> { - return new Fork(source, List.of(child, child)); + return new Fork(source, List.of(child, child), List.of()); } case "Sample" -> { return new Sample(source, null, null, child); From f12dad4b6bbe7def2e6df529fc7251ff48c2e4d8 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Thu, 22 May 2025 10:51:50 +0200 Subject: [PATCH 2/3] Bump esql capability for FORK --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 4 ++-- .../src/main/resources/fork.csv-spec | 22 ++++++++++--------- .../xpack/esql/action/EsqlCapabilities.java | 2 +- .../xpack/esql/analysis/Analyzer.java | 2 +- .../elasticsearch/xpack/esql/CsvTests.java | 2 +- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 5b46efe424575..441a6fde1fb7a 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -46,7 +46,7 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V3; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V4; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V6; @@ -132,7 +132,7 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())); // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); - assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V3.capabilityName())); + assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V4.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec index 5480dda8aecbe..951e2fd99ec1b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec @@ -3,7 +3,7 @@ // simpleFork -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | FORK ( WHERE emp_no == 10001 ) @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword ; forkWithWhereSortAndLimit -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 ) @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword ; fiveFork -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | FORK ( WHERE emp_no == 10005 ) @@ -59,7 +59,7 @@ fork5 | 10001 ; forkWithWhereSortDescAndLimit -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 ) @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin ; forkWithCommonPrefilter -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | WHERE emp_no > 10050 @@ -94,7 +94,7 @@ fork2 | 10100 ; forkWithSemanticSearchAndScore -required_capability: fork_v3 +required_capability: fork_v4 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w ; forkWithEvals -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1) @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2 ; forkWithStats -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | FORK (WHERE emp_no == 10048 OR emp_no == 10081) @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null ; forkWithDissect -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | WHERE emp_no == 10048 OR emp_no == 10081 @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei ; forkWithMixOfCommands -required_capability: fork_v3 +required_capability: fork_v4 FROM employees | WHERE emp_no == 10048 OR emp_no == 10081 @@ -197,6 +197,8 @@ fork4 | 10081 | abc | aaa | null | null ; forkWithFiltersOnConstantValues +required_capability: fork_v4 + FROM employees | EVAL z = 1 | WHERE z == 1 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 07f910c849c98..fe8a53fd3c571 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1008,7 +1008,7 @@ public enum Cap { /** * Support streaming of sub plan results */ - FORK_V3(Build.current().isSnapshot()), + FORK_V4(Build.current().isSnapshot()), /** * Support for the {@code leading_zeros} named parameter. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 775b441c4f000..99c432d6cf9a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -796,7 +796,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { newSubPlans.add(logicalPlan); } - if (!changed) { + if (changed == false) { return fork; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 7ab45f805c754..eab14ad56fbfa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -310,7 +310,7 @@ public final void test() throws Throwable { ); assumeFalse( "CSV tests cannot currently handle FORK", - testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V3.capabilityName()) + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V4.capabilityName()) ); assumeFalse( "CSV tests cannot currently handle multi_match function that depends on Lucene", From 29bdaf0083914e9796790353f24f43844952f6e2 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Thu, 22 May 2025 13:53:03 +0200 Subject: [PATCH 3/3] fix tests --- .../xpack/esql/analysis/Analyzer.java | 4 +- .../xpack/esql/analysis/AnalyzerTests.java | 44 ++++++++++++++----- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 99c432d6cf9a6..ac9108db5d2f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -753,7 +753,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { boolean changed = false; List newSubPlans = new ArrayList<>(); List outputUnion = Fork.outputUnion(fork.children()); - Set forkColumns = outputUnion.stream().map(Attribute::name).collect(Collectors.toSet()); + List forkColumns = outputUnion.stream().map(Attribute::name).toList(); for (LogicalPlan logicalPlan : fork.children()) { Source source = logicalPlan.source(); @@ -804,7 +804,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { // We don't want to keep the same attributes that are outputted by the FORK branches. // Keeping the same attributes can have unintended side effects when applying optimizations like constant folding. - for (Attribute attr : Fork.outputUnion(newSubPlans)) { + for (Attribute attr : newSubPlans.getFirst().output()) { newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType())); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index f3afa24969f33..ab5406f4560d9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -3039,6 +3039,7 @@ public void testBasicFork() { LogicalPlan plan = analyze(""" from test + | KEEP emp_no, first_name, last_name | WHERE first_name == "Chris" | FORK ( WHERE emp_no > 1 ) ( WHERE emp_no > 2 ) @@ -3047,6 +3048,7 @@ public void testBasicFork() { ( LIMIT 9 ) """); + var expectedOutput = List.of("emp_no", "first_name", "last_name", "_fork"); Limit limit = as(plan, Limit.class); Fork fork = as(limit.child(), Fork.class); @@ -3056,33 +3058,44 @@ public void testBasicFork() { // fork branch 1 limit = as(subPlans.get(0), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(DEFAULT_LIMIT)); - Eval eval = as(limit.child(), Eval.class); + Keep keep = as(limit.child(), Keep.class); + List keptColumns = keep.expressions().stream().map(exp -> as(exp, Attribute.class).name()).toList(); + assertThat(keptColumns, equalTo(expectedOutput)); + Eval eval = as(keep.child(), Eval.class); assertThat(as(eval.fields().get(0), Alias.class), equalTo(alias("_fork", string("fork1")))); Filter filter = as(eval.child(), Filter.class); assertThat(as(filter.condition(), GreaterThan.class).right(), equalTo(literal(1))); filter = as(filter.child(), Filter.class); assertThat(as(filter.condition(), Equals.class).right(), equalTo(string("Chris"))); - var esRelation = as(filter.child(), EsRelation.class); + EsqlProject project = as(filter.child(), EsqlProject.class); + var esRelation = as(project.child(), EsRelation.class); assertThat(esRelation.indexPattern(), equalTo("test")); // fork branch 2 limit = as(subPlans.get(1), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(DEFAULT_LIMIT)); - eval = as(limit.child(), Eval.class); + keep = as(limit.child(), Keep.class); + keptColumns = keep.expressions().stream().map(exp -> as(exp, Attribute.class).name()).toList(); + assertThat(keptColumns, equalTo(expectedOutput)); + eval = as(keep.child(), Eval.class); assertThat(as(eval.fields().get(0), Alias.class), equalTo(alias("_fork", string("fork2")))); filter = as(eval.child(), Filter.class); assertThat(as(filter.condition(), GreaterThan.class).right(), equalTo(literal(2))); filter = as(filter.child(), Filter.class); assertThat(as(filter.condition(), Equals.class).right(), equalTo(string("Chris"))); - esRelation = as(filter.child(), EsRelation.class); + project = as(filter.child(), EsqlProject.class); + esRelation = as(project.child(), EsRelation.class); assertThat(esRelation.indexPattern(), equalTo("test")); // fork branch 3 limit = as(subPlans.get(2), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(MAX_LIMIT)); - eval = as(limit.child(), Eval.class); + keep = as(limit.child(), Keep.class); + keptColumns = keep.expressions().stream().map(exp -> as(exp, Attribute.class).name()).toList(); + assertThat(keptColumns, equalTo(expectedOutput)); + eval = as(keep.child(), Eval.class); assertThat(as(eval.fields().get(0), Alias.class), equalTo(alias("_fork", string("fork3")))); limit = as(eval.child(), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(7)); @@ -3091,30 +3104,39 @@ public void testBasicFork() { assertThat(as(filter.condition(), GreaterThan.class).right(), equalTo(literal(3))); filter = as(filter.child(), Filter.class); assertThat(as(filter.condition(), Equals.class).right(), equalTo(string("Chris"))); - esRelation = as(filter.child(), EsRelation.class); + project = as(filter.child(), EsqlProject.class); + esRelation = as(project.child(), EsRelation.class); assertThat(esRelation.indexPattern(), equalTo("test")); // fork branch 4 limit = as(subPlans.get(3), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(DEFAULT_LIMIT)); - eval = as(limit.child(), Eval.class); + keep = as(limit.child(), Keep.class); + keptColumns = keep.expressions().stream().map(exp -> as(exp, Attribute.class).name()).toList(); + assertThat(keptColumns, equalTo(expectedOutput)); + eval = as(keep.child(), Eval.class); assertThat(as(eval.fields().get(0), Alias.class), equalTo(alias("_fork", string("fork4")))); orderBy = as(eval.child(), OrderBy.class); filter = as(orderBy.child(), Filter.class); assertThat(as(filter.condition(), Equals.class).right(), equalTo(string("Chris"))); - esRelation = as(filter.child(), EsRelation.class); + project = as(filter.child(), EsqlProject.class); + esRelation = as(project.child(), EsRelation.class); assertThat(esRelation.indexPattern(), equalTo("test")); // fork branch 5 limit = as(subPlans.get(4), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(MAX_LIMIT)); - eval = as(limit.child(), Eval.class); + keep = as(limit.child(), Keep.class); + keptColumns = keep.expressions().stream().map(exp -> as(exp, Attribute.class).name()).toList(); + assertThat(keptColumns, equalTo(expectedOutput)); + eval = as(keep.child(), Eval.class); assertThat(as(eval.fields().get(0), Alias.class), equalTo(alias("_fork", string("fork5")))); limit = as(eval.child(), Limit.class); assertThat(as(limit.limit(), Literal.class).value(), equalTo(9)); filter = as(limit.child(), Filter.class); assertThat(as(filter.condition(), Equals.class).right(), equalTo(string("Chris"))); - esRelation = as(filter.child(), EsRelation.class); + project = as(filter.child(), EsqlProject.class); + esRelation = as(project.child(), EsRelation.class); assertThat(esRelation.indexPattern(), equalTo("test")); } @@ -3352,7 +3374,7 @@ public void testValidRrf() { assertThat(dedup.aggregates().size(), equalTo(15)); RrfScoreEval rrf = as(dedup.child(), RrfScoreEval.class); - assertThat(rrf.scoreAttribute(), instanceOf(MetadataAttribute.class)); + assertThat(rrf.scoreAttribute(), instanceOf(ReferenceAttribute.class)); assertThat(rrf.scoreAttribute().name(), equalTo("_score")); assertThat(rrf.forkAttribute(), instanceOf(ReferenceAttribute.class)); assertThat(rrf.forkAttribute().name(), equalTo("_fork"));