diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index d48f8af9c97e3..631e3575d2f60 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -405,7 +405,7 @@ public void testAggThenEnrichRemote() { | sort vendor """, enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE)); var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); - assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after STATS")); + assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [stats c = COUNT(*) by os]@4:3")); } public void testEnrichCoordinatorThenEnrichRemote() { @@ -417,10 +417,7 @@ public void testEnrichCoordinatorThenEnrichRemote() { | sort vendor """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE)); var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); - assertThat( - error.getMessage(), - containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy") - ); + assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [ENRICH _COORDINATOR")); } private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java index 7d44fa1fda5a2..dfc1a26ae980c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java @@ -18,7 +18,7 @@ public final class ReplaceLimitAndSortAsTopN extends OptimizerRules.OptimizerRul protected LogicalPlan rule(Limit plan) { LogicalPlan p = plan; if (plan.child() instanceof OrderBy o) { - p = new TopN(plan.source(), o.child(), o.order(), plan.limit()); + p = new TopN(o.source(), o.child(), o.order(), plan.limit()); } return p; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java index 794957dc473eb..5e265c30c57ba 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java @@ -45,7 +45,13 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType; -public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker { +public class Aggregate extends UnaryPlan + implements + PostAnalysisVerificationAware, + TelemetryAware, + SortAgnostic, + PipelineBreaker, + ExecutesOn.Coordinator { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "Aggregate", diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java index 82bd5b1f69bfe..6338e32c6f5a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java @@ -44,7 +44,12 @@ * enforced by the Limit in the surrogate plan. */ @SupportsObservabilityTier(tier = COMPLETE) -public class ChangePoint extends UnaryPlan implements SurrogateLogicalPlan, PostAnalysisVerificationAware, LicenseAware { +public class ChangePoint extends UnaryPlan + implements + SurrogateLogicalPlan, + PostAnalysisVerificationAware, + LicenseAware, + ExecutesOn.Coordinator { private final Attribute value; private final Attribute key; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index af56345438c21..ef268ff6b7964 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -17,7 +17,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; +import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; @@ -34,7 +34,6 @@ import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.GeneratingPlan; -import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; import java.io.IOException; import java.util.ArrayList; @@ -44,14 +43,19 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.BiConsumer; import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes; import static org.elasticsearch.xpack.esql.expression.Foldables.literalValueOf; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Enrich extends UnaryPlan implements GeneratingPlan, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic { +public class Enrich extends UnaryPlan + implements + GeneratingPlan, + PostOptimizationVerificationAware, + TelemetryAware, + SortAgnostic, + ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "Enrich", @@ -68,6 +72,16 @@ public class Enrich extends UnaryPlan implements GeneratingPlan, PostAna private final Mode mode; + @Override + public ExecuteLocation executesOn() { + if (mode == Mode.REMOTE) { + return ExecuteLocation.REMOTE; + } else if (mode == Mode.COORDINATOR) { + return ExecuteLocation.COORDINATOR; + } + return ExecuteLocation.ANY; + } + public enum Mode { ANY, COORDINATOR, @@ -284,11 +298,6 @@ public int hashCode() { return Objects.hash(super.hashCode(), mode, policyName, matchField, policy, concreteIndices, enrichFields); } - @Override - public BiConsumer postAnalysisPlanVerification() { - return Enrich::checkRemoteEnrich; - } - /** * Ensure that no remote enrich is allowed after a reduction or an enrich with coordinator mode. *

@@ -299,36 +308,24 @@ public BiConsumer postAnalysisPlanVerification() { * In that case, users have to write it as `FROM test | ENRICH _remote: | ORDER @timestamp | LIMIT 10`, * which is equivalent to bringing all data to the coordinating cluster. * We might consider implementing the actual remote enrich on the coordinating cluster, however, this requires - * retaining the originating cluster and restructing pages for routing, which might be complicated. - */ - private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) { - // First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be - // in separate FORK branches which are valid by themselves. - plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures)); - } - - /** - * For a given remote {@link Enrich}, check if there are any forbidden plans upstream. + * retaining the originating cluster and restructuring pages for routing, which might be complicated. */ - private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) { - if (enrich.mode != Mode.REMOTE) { - return; - } + private void checkForPlansForbiddenBeforeRemoteEnrich(Failures failures) { + Set fails = new HashSet<>(); - Set badCommands = new HashSet<>(); - - enrich.forEachUp(LogicalPlan.class, u -> { - if (u instanceof Aggregate) { - badCommands.add("STATS"); - } else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) { - badCommands.add("another ENRICH with coordinator policy"); - } else if (u instanceof LookupJoin) { - badCommands.add("LOOKUP JOIN"); - } else if (u instanceof Fork) { - badCommands.add("FORK"); + this.forEachUp(LogicalPlan.class, u -> { + if (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR) { + fails.add(u.source()); } }); - badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c))); + fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))); + } + + @Override + public void postOptimizationVerification(Failures failures) { + if (this.mode == Mode.REMOTE) { + checkForPlansForbiddenBeforeRemoteEnrich(failures); + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExecutesOn.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExecutesOn.java new file mode 100644 index 0000000000000..899107f3b1bb4 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExecutesOn.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +/** + * Mark nodes that execute only in a specific way, either on the coordinator or on a remote node. + */ +public interface ExecutesOn { + enum ExecuteLocation { + COORDINATOR, + REMOTE, + ANY; // Can be executed on either coordinator or remote nodes + } + + ExecuteLocation executesOn(); + + /** + * Executes on the remote nodes only (note that may include coordinator, but not on the aggregation stage). + */ + interface Remote extends ExecutesOn { + @Override + default ExecuteLocation executesOn() { + return ExecuteLocation.REMOTE; + } + } + + /** + * Executes on the coordinator only. Can not be run on remote nodes. + */ + interface Coordinator extends ExecutesOn { + @Override + default ExecuteLocation executesOn() { + return ExecuteLocation.COORDINATOR; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java index 1266ccbb1e962..09719774f8d78 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java @@ -33,7 +33,7 @@ * A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g. * {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] } */ -public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker { +public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, ExecutesOn.Coordinator { public static final String FORK_FIELD = "_fork"; public static final int MAX_BRANCHES = 8; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java index 191664bea9a81..ffb7ccfbe4798 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.io.IOException; @@ -33,7 +34,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Completion extends InferencePlan implements TelemetryAware, PostAnalysisVerificationAware { +public class Completion extends InferencePlan implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator { public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java index 6f86138397fa6..a63b70cd6cc57 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -37,7 +38,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Rerank extends InferencePlan implements PostAnalysisVerificationAware, TelemetryAware { +public class Rerank extends InferencePlan implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new); public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 2c7b1a399b3f2..2f217df1468a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; +import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; @@ -21,14 +22,19 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; +import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; @@ -56,7 +62,7 @@ import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.commonType; -public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic { +public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic, ExecutesOn, PostOptimizationVerificationAware { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new); public static final DataType[] UNSUPPORTED_TYPES = { TEXT, @@ -309,4 +315,40 @@ private static boolean comparableTypes(Attribute left, Attribute right) { public boolean isRemote() { return isRemote; } + + @Override + public ExecuteLocation executesOn() { + return isRemote ? ExecuteLocation.REMOTE : ExecuteLocation.COORDINATOR; + } + + private void checkRemoteJoin(Failures failures) { + Set fails = new HashSet<>(); + + var myself = this; + this.forEachUp(LogicalPlan.class, u -> { + if (u == myself) { + return; // skip myself + } + if (u instanceof Limit) { + // Limit is ok because it can be moved in by the optimizer + // We check LIMITs in LookupJoin pre-optimization so they are still not allowed there + return; + } + if (u instanceof PipelineBreaker || (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR)) { + fails.add(u.source()); + } + }); + + fails.forEach( + f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.text() + "]" + f.source())) + ); + + } + + @Override + public void postOptimizationVerification(Failures failures) { + if (isRemote()) { + checkRemoteJoin(failures); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java index 16ff5fba7bbd8..20913e0e27ce7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java @@ -13,14 +13,11 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan; -import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType; -import java.util.LinkedList; import java.util.List; import static java.util.Collections.emptyList; @@ -30,7 +27,7 @@ /** * Lookup join - specialized LEFT (OUTER) JOIN between the main left side and a lookup index (index_mode = lookup) on the right. */ -public class LookupJoin extends Join implements SurrogateLogicalPlan, PostAnalysisVerificationAware, TelemetryAware { +public class LookupJoin extends Join implements SurrogateLogicalPlan, TelemetryAware, PostAnalysisVerificationAware { public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List joinFields, boolean isRemote) { this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), isRemote); @@ -104,21 +101,11 @@ public void postAnalysisVerification(Failures failures) { } private void checkRemoteJoin(Failures failures) { - List fails = new LinkedList<>(); - - this.forEachUp(UnaryPlan.class, u -> { - if (u instanceof PipelineBreaker) { - fails.add(u.source()); - } - if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { - fails.add(u.source()); - } + // Check only for LIMITs, Join will check the rest post-optimization + this.forEachUp(Limit.class, f -> { + failures.add( + fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.source().text() + "]" + f.source().source()) + ); }); - - fails.forEach( - f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.text() + "]" + f.source())) - ); - } - } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index 37d6719ddccfc..ab2f77a3c443e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.parser.QueryParam; import org.elasticsearch.xpack.esql.parser.QueryParams; -import org.elasticsearch.xpack.esql.plan.logical.Enrich; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -39,13 +38,9 @@ import java.util.Map; import java.util.Set; -import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant; import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; -import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; -import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping; import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN; import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT; @@ -2272,43 +2267,6 @@ public void testFullTextFunctionsInStats() { } } - public void testRemoteLookupJoinWithPipelineBreaker() { - assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); - var analyzer = AnalyzerTestUtils.analyzer(loadMapping("mapping-default.json", "test,remote:test")); - assertEquals( - "1:92: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@1:25", - error( - "FROM test,remote:test | STATS c = COUNT(*) by languages " - + "| EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", - analyzer - ) - ); - - assertEquals( - "1:72: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@1:25", - error( - "FROM test,remote:test | SORT emp_no | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", - analyzer - ) - ); - - assertEquals( - "1:68: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@1:25", - error( - "FROM test,remote:test | LIMIT 2 | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", - analyzer - ) - ); - assertEquals( - "1:96: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@1:58", - error( - "FROM test,remote:test | EVAL language_code = languages | ENRICH _coordinator:languages_coord " - + "| LOOKUP JOIN languages_lookup ON language_code", - analyzer - ) - ); - } - public void testRemoteLookupJoinIsSnapshot() { // TODO: remove when we allow remote joins in release builds assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); @@ -2325,148 +2283,6 @@ public void testRemoteLookupJoinIsDisabled() { assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN")); } - public void testRemoteEnrichAfterLookupJoin() { - EnrichResolution enrichResolution = new EnrichResolution(); - loadEnrichPolicyResolution( - enrichResolution, - Enrich.Mode.REMOTE, - MATCH_TYPE, - "languages", - "language_code", - "languages_idx", - "mapping-languages.json" - ); - var analyzer = AnalyzerTestUtils.analyzer( - loadMapping("mapping-default.json", "test"), - defaultLookupResolution(), - enrichResolution, - TEST_VERIFIER - ); - - String lookupCommand = randomBoolean() ? "LOOKUP JOIN test_lookup ON languages" : "LOOKUP JOIN languages_lookup ON language_code"; - - query(Strings.format(""" - FROM test - | EVAL language_code = languages - | ENRICH _remote:languages ON language_code - | %s - """, lookupCommand), analyzer); - - String err = error(Strings.format(""" - FROM test - | EVAL language_code = languages - | %s - | ENRICH _remote:languages ON language_code - """, lookupCommand), analyzer); - assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN")); - - err = error(Strings.format(""" - FROM test - | EVAL language_code = languages - | %s - | ENRICH _remote:languages ON language_code - | %s - """, lookupCommand, lookupCommand), analyzer); - assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN")); - - err = error(Strings.format(""" - FROM test - | EVAL language_code = languages - | %s - | EVAL x = 1 - | MV_EXPAND language_code - | ENRICH _remote:languages ON language_code - """, lookupCommand), analyzer); - assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after LOOKUP JOIN")); - } - - public void testRemoteEnrichAfterCoordinatorOnlyPlans() { - EnrichResolution enrichResolution = new EnrichResolution(); - loadEnrichPolicyResolution( - enrichResolution, - Enrich.Mode.REMOTE, - MATCH_TYPE, - "languages", - "language_code", - "languages_idx", - "mapping-languages.json" - ); - loadEnrichPolicyResolution( - enrichResolution, - Enrich.Mode.COORDINATOR, - MATCH_TYPE, - "languages", - "language_code", - "languages_idx", - "mapping-languages.json" - ); - var analyzer = AnalyzerTestUtils.analyzer( - loadMapping("mapping-default.json", "test"), - defaultLookupResolution(), - enrichResolution, - TEST_VERIFIER - ); - - query(""" - FROM test - | EVAL language_code = languages - | ENRICH _remote:languages ON language_code - | STATS count(*) BY language_name - """, analyzer); - - String err = error(""" - FROM test - | EVAL language_code = languages - | STATS count(*) BY language_code - | ENRICH _remote:languages ON language_code - """, analyzer); - assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after STATS")); - - err = error(""" - FROM test - | EVAL language_code = languages - | STATS count(*) BY language_code - | EVAL x = 1 - | MV_EXPAND language_code - | ENRICH _remote:languages ON language_code - """, analyzer); - assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after STATS")); - - query(""" - FROM test - | EVAL language_code = languages - | ENRICH _remote:languages ON language_code - | ENRICH _coordinator:languages ON language_code - """, analyzer); - - err = error(""" - FROM test - | EVAL language_code = languages - | ENRICH _coordinator:languages ON language_code - | ENRICH _remote:languages ON language_code - """, analyzer); - assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); - - err = error(""" - FROM test - | EVAL language_code = languages - | ENRICH _coordinator:languages ON language_code - | EVAL x = 1 - | MV_EXPAND language_name - | DISSECT language_name "%{foo}" - | ENRICH _remote:languages ON language_code - """, analyzer); - assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); - - err = error(""" - FROM test - | FORK (WHERE languages == 1) (WHERE languages == 2) - | EVAL language_code = languages - | ENRICH _remote:languages ON language_code - """, analyzer); - assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after FORK")); - } - private void checkFullTextFunctionsInStats(String functionInvocation) { query("from test | stats c = max(id) where " + functionInvocation, fullTextAnalyzer); query("from test | stats c = max(id) where " + functionInvocation + " or length(title) > 10", fullTextAnalyzer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java new file mode 100644 index 0000000000000..fcfe4a0a2a455 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java @@ -0,0 +1,327 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.parser.QueryParam; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; + +public class OptimizerVerificationTests extends AbstractLogicalPlanOptimizerTests { + + private LogicalPlan plan(String query, Analyzer analyzer) { + var analyzed = analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)); + return logicalOptimizer.optimize(analyzed); + } + + private String error(String query, Analyzer analyzer, Object... params) { + List parameters = new ArrayList<>(); + for (Object param : params) { + if (param == null) { + parameters.add(paramAsConstant(null, null)); + } else if (param instanceof String) { + parameters.add(paramAsConstant(null, param)); + } else if (param instanceof Number) { + parameters.add(paramAsConstant(null, param)); + } else { + throw new IllegalArgumentException("VerifierTests don't support params of type " + param.getClass()); + } + } + Throwable e = expectThrows( + VerificationException.class, + "Expected error for query [" + query + "] but no error was raised", + () -> plan(query, analyzer) + ); + assertThat(e, instanceOf(VerificationException.class)); + + String message = e.getMessage(); + assertTrue(message.startsWith("Found ")); + + String pattern = "\nline "; + int index = message.indexOf(pattern); + return message.substring(index + pattern.length()); + } + + public void testRemoteEnrichAfterCoordinatorOnlyPlans() { + EnrichResolution enrichResolution = new EnrichResolution(); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.COORDINATOR, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + var analyzer = AnalyzerTestUtils.analyzer( + loadMapping("mapping-default.json", "test"), + defaultLookupResolution(), + enrichResolution, + TEST_VERIFIER + ); + + String err; + + plan(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | STATS count(*) BY language_name + """, analyzer); + + plan(""" + FROM test + | LIMIT 10 + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | STATS count(*) BY language_name + """, analyzer); + + plan(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | STATS count(*) BY language_name + | LIMIT 10 + """, analyzer); + + err = error(""" + FROM test + | EVAL language_code = languages + | STATS count(*) BY language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [STATS count(*) BY language_code]@3:3")); + + err = error(""" + FROM test + | EVAL language_code = languages + | INLINESTATS count(*) BY language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString("4:3: ENRICH with remote policy can't be executed after [INLINESTATS count(*) BY language_code]@3:3") + ); + + err = error(""" + FROM test + | EVAL language_code = languages + | STATS count(*) BY language_code + | EVAL x = 1 + | MV_EXPAND language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after [STATS count(*) BY language_code]@3:3")); + + // Coordinator after remote is OK + plan(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | ENRICH _coordinator:languages ON language_code + """, analyzer); + + err = error(""" + FROM test + | EVAL language_code = languages + | ENRICH _coordinator:languages ON language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString("4:3: ENRICH with remote policy can't be executed after [ENRICH _coordinator:languages ON language_code]@3:3") + ); + + err = error(""" + FROM test + | EVAL language_code = languages + | ENRICH _coordinator:languages ON language_code + | EVAL x = 1 + | MV_EXPAND language_name + | DISSECT language_name "%{foo}" + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString("7:3: ENRICH with remote policy can't be executed after [ENRICH _coordinator:languages ON language_code]@3:3") + ); + + err = error(""" + FROM test + | FORK (WHERE languages == 1) (WHERE languages == 2) + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString( + "4:3: ENRICH with remote policy can't be executed after [FORK (WHERE languages == 1) (WHERE languages == 2)]@2:3" + ) + ); + + err = error(""" + FROM test + | COMPLETION language_code = "some prompt" WITH { "inference_id" : "completion-inference-id" } + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString( + "ENRICH with remote policy can't be executed after " + + "[COMPLETION language_code = \"some prompt\" WITH { \"inference_id\" : \"completion-inference-id\" }]@2:3" + ) + ); + + err = error(""" + FROM test + | RERANK language_code="test" ON languages WITH { "inference_id" : "reranking-inference-id" } + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString( + "ENRICH with remote policy can't be executed after " + + "[RERANK language_code=\"test\" ON languages WITH { \"inference_id\" : \"reranking-inference-id\" }]@2:3" + ) + ); + + err = error(""" + FROM test + | CHANGE_POINT salary ON languages + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [CHANGE_POINT salary ON languages]@2:3")); + } + + public void testRemoteEnrichAfterLookupJoin() { + EnrichResolution enrichResolution = new EnrichResolution(); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + var analyzer = AnalyzerTestUtils.analyzer( + loadMapping("mapping-default.json", "test"), + defaultLookupResolution(), + enrichResolution, + TEST_VERIFIER + ); + + String lookupCommand = randomBoolean() ? "LOOKUP JOIN test_lookup ON languages" : "LOOKUP JOIN languages_lookup ON language_code"; + + plan(Strings.format(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | %s + """, lookupCommand), analyzer); + + String err = error(Strings.format(""" + FROM test + | EVAL language_code = languages + | %s + | ENRICH _remote:languages ON language_code + """, lookupCommand), analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3")); + + err = error(Strings.format(""" + FROM test + | EVAL language_code = languages + | %s + | ENRICH _remote:languages ON language_code + | %s + """, lookupCommand, lookupCommand), analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3")); + + err = error(Strings.format(""" + FROM test + | EVAL language_code = languages + | %s + | EVAL x = 1 + | MV_EXPAND language_code + | ENRICH _remote:languages ON language_code + """, lookupCommand), analyzer); + assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3")); + } + + public void testRemoteLookupJoinWithPipelineBreaker() { + assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + var analyzer = AnalyzerTestUtils.analyzer(loadMapping("mapping-default.json", "test,remote:test")); + assertEquals( + "1:92: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@1:25", + error( + "FROM test,remote:test | STATS c = COUNT(*) by languages " + + "| EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + + assertEquals( + "1:72: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@1:25", + error( + "FROM test,remote:test | SORT emp_no | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + + assertEquals( + "1:68: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@1:25", + error( + "FROM test,remote:test | LIMIT 2 | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + + assertEquals( + "1:96: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@1:58", + error( + "FROM test,remote:test | EVAL language_code = languages | ENRICH _coordinator:languages_coord " + + "| LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + + plan("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code | LIMIT 2", analyzer); + + // Since FORK, RERANK, COMPLETION and CHANGE_POINT are not supported on remote indices, we can't check them here against the remote + // LOOKUP JOIN + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 3464e2e74217a..5ff6687145798 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7004,7 +7004,10 @@ public void testAggThenEnrichRemote() { | eval employee_id = to_str(emp_no) | ENRICH _remote:departments """)); - assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after STATS")); + assertThat( + error.getMessage(), + containsString("line 4:3: ENRICH with remote policy can't be executed after [STATS size=count(*) BY emp_no]@2:3") + ); } public void testEnrichBeforeLimit() { @@ -7354,7 +7357,7 @@ public void testRejectRemoteEnrichAfterCoordinatorEnrich() { """)); assertThat( error.getMessage(), - containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy") + containsString("ENRICH with remote policy can't be executed after [ENRICH _coordinator:departments]@3:3") ); }