From 499cd26f36111a27fc59d871215fcb945981408f Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 22 Sep 2025 13:29:29 -0600 Subject: [PATCH 1/2] Ban Limit + MvExpand before remote Enrich (#135051) * Ban Limit + MvExpand before remote Enrich (cherry picked from commit 7f1d2dc3335f541bb5e70050416cfc585f6e80db) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java --- docs/changelog/135051.yaml | 5 ++ .../esql/action/CrossClusterEnrichIT.java | 14 +++++ .../xpack/esql/plan/logical/Enrich.java | 55 ++++++++++++++++--- 3 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 docs/changelog/135051.yaml diff --git a/docs/changelog/135051.yaml b/docs/changelog/135051.yaml new file mode 100644 index 0000000000000..f9b22e0f9d570 --- /dev/null +++ b/docs/changelog/135051.yaml @@ -0,0 +1,5 @@ +pr: 135051 +summary: Ban Limit + `MvExpand` before remote Enrich +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index d48f8af9c97e3..d1c5ddbcd3cd9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -423,6 +423,20 @@ public void testEnrichCoordinatorThenEnrichRemote() { ); } + public void testEnrichAfterMvExpandLimit() { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | SORT timestamp + | LIMIT 2 + | eval ip= TO_STR(host) + | MV_EXPAND host + | WHERE ip != "" + | %s + """, enrichHosts(Enrich.Mode.REMOTE)); + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); + assertThat(error.getMessage(), containsString("MV_EXPAND after LIMIT is incompatible with remote ENRICH")); + } + private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertTrue(executionInfo.isCrossClusterSearch()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index 7307fd8efad39..80333a0e0ce7b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -17,7 +17,8 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; +import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; +import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; @@ -50,7 +51,14 @@ import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Enrich extends UnaryPlan implements GeneratingPlan, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic { +public class Enrich extends UnaryPlan + implements + GeneratingPlan, + PostOptimizationVerificationAware, + PostAnalysisVerificationAware, + TelemetryAware, + SortAgnostic, + ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "Enrich", @@ -325,14 +333,43 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail } }); - if (aggregate[0]) { - failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS")); - } - if (coordinatorOnlyEnrich[0]) { - failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); + fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))); + } + + /** + * Remote ENRICH (and any remote operation in fact) is not compatible with MV_EXPAND + LIMIT. Consider: + * `FROM *:events | SORT @timestamp | LIMIT 2 | MV_EXPAND ip | ENRICH _remote:clientip_policy ON ip` + * Semantically, this must take two top events and then expand them. However, this can not be executed remotely, + * because this means that we have to take top 2 events on each node, then expand them, then apply Enrich, + * then bring them to the coordinator - but then we can not select top 2 of them - because that would be pre-expand! + * We do not know which expanded rows are coming from the true top rows and which are coming from "false" top rows + * which should have been thrown out. This is only possible to execute if MV_EXPAND executes on the coordinator + * - which contradicts remote Enrich. + * This could be fixed by the optimizer by moving MV_EXPAND past ENRICH, at least in some cases, but currently we do not do that. + */ + private void checkMvExpandAfterLimit(Failures failures) { + this.forEachDown(MvExpand.class, u -> { + u.forEachDown(p -> { + if (p instanceof Limit || p instanceof TopN) { + failures.add(fail(this, "MV_EXPAND after LIMIT is incompatible with remote ENRICH")); + } + }); + }); + + } + + @Override + public void postAnalysisVerification(Failures failures) { + if (this.mode == Mode.REMOTE) { + checkMvExpandAfterLimit(failures); } - if (lookupJoin[0]) { - failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN")); + + } + + @Override + public void postOptimizationVerification(Failures failures) { + if (this.mode == Mode.REMOTE) { + checkForPlansForbiddenBeforeRemoteEnrich(failures); } } } From 57696a2cd8d41c4f2550e90d479bbdcbab48e755 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 23 Sep 2025 15:15:09 -0600 Subject: [PATCH 2/2] fix merge --- .../xpack/esql/plan/logical/Enrich.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index 80333a0e0ce7b..0ab8f23b92f65 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -17,8 +17,8 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; -import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; @@ -54,11 +54,10 @@ public class Enrich extends UnaryPlan implements GeneratingPlan, - PostOptimizationVerificationAware, + PostAnalysisPlanVerificationAware, PostAnalysisVerificationAware, TelemetryAware, - SortAgnostic, - ExecutesOn { + SortAgnostic { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "Enrich", @@ -333,7 +332,15 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail } }); - fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))); + if (aggregate[0]) { + failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS")); + } + if (coordinatorOnlyEnrich[0]) { + failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); + } + if (lookupJoin[0]) { + failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN")); + } } /** @@ -365,11 +372,4 @@ public void postAnalysisVerification(Failures failures) { } } - - @Override - public void postOptimizationVerification(Failures failures) { - if (this.mode == Mode.REMOTE) { - checkForPlansForbiddenBeforeRemoteEnrich(failures); - } - } }