Skip to content

Commit 499cd26

Browse files
committed
Ban Limit + MvExpand before remote Enrich (elastic#135051)
* Ban Limit + MvExpand before remote Enrich (cherry picked from commit 7f1d2dc) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java
1 parent 8552488 commit 499cd26

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

docs/changelog/135051.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135051
2+
summary: Ban Limit + `MvExpand` before remote Enrich
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,20 @@ public void testEnrichCoordinatorThenEnrichRemote() {
423423
);
424424
}
425425

426+
public void testEnrichAfterMvExpandLimit() {
427+
String query = String.format(Locale.ROOT, """
428+
FROM *:events,events
429+
| SORT timestamp
430+
| LIMIT 2
431+
| eval ip= TO_STR(host)
432+
| MV_EXPAND host
433+
| WHERE ip != ""
434+
| %s
435+
""", enrichHosts(Enrich.Mode.REMOTE));
436+
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
437+
assertThat(error.getMessage(), containsString("MV_EXPAND after LIMIT is incompatible with remote ENRICH"));
438+
}
439+
426440
private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
427441
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
428442
assertTrue(executionInfo.isCrossClusterSearch());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
import org.elasticsearch.index.IndexMode;
1818
import org.elasticsearch.transport.RemoteClusterAware;
1919
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
20-
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
20+
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
21+
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
2122
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
2223
import org.elasticsearch.xpack.esql.common.Failures;
2324
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
@@ -50,7 +51,14 @@
5051
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
5152
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
5253

53-
public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
54+
public class Enrich extends UnaryPlan
55+
implements
56+
GeneratingPlan<Enrich>,
57+
PostOptimizationVerificationAware,
58+
PostAnalysisVerificationAware,
59+
TelemetryAware,
60+
SortAgnostic,
61+
ExecutesOn {
5462
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5563
LogicalPlan.class,
5664
"Enrich",
@@ -325,14 +333,43 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
325333
}
326334
});
327335

328-
if (aggregate[0]) {
329-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
330-
}
331-
if (coordinatorOnlyEnrich[0]) {
332-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
336+
fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source())));
337+
}
338+
339+
/**
340+
* Remote ENRICH (and any remote operation in fact) is not compatible with MV_EXPAND + LIMIT. Consider:
341+
* `FROM *:events | SORT @timestamp | LIMIT 2 | MV_EXPAND ip | ENRICH _remote:clientip_policy ON ip`
342+
* Semantically, this must take two top events and then expand them. However, this can not be executed remotely,
343+
* because this means that we have to take top 2 events on each node, then expand them, then apply Enrich,
344+
* then bring them to the coordinator - but then we can not select top 2 of them - because that would be pre-expand!
345+
* We do not know which expanded rows are coming from the true top rows and which are coming from "false" top rows
346+
* which should have been thrown out. This is only possible to execute if MV_EXPAND executes on the coordinator
347+
* - which contradicts remote Enrich.
348+
* This could be fixed by the optimizer by moving MV_EXPAND past ENRICH, at least in some cases, but currently we do not do that.
349+
*/
350+
private void checkMvExpandAfterLimit(Failures failures) {
351+
this.forEachDown(MvExpand.class, u -> {
352+
u.forEachDown(p -> {
353+
if (p instanceof Limit || p instanceof TopN) {
354+
failures.add(fail(this, "MV_EXPAND after LIMIT is incompatible with remote ENRICH"));
355+
}
356+
});
357+
});
358+
359+
}
360+
361+
@Override
362+
public void postAnalysisVerification(Failures failures) {
363+
if (this.mode == Mode.REMOTE) {
364+
checkMvExpandAfterLimit(failures);
333365
}
334-
if (lookupJoin[0]) {
335-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
366+
367+
}
368+
369+
@Override
370+
public void postOptimizationVerification(Failures failures) {
371+
if (this.mode == Mode.REMOTE) {
372+
checkForPlansForbiddenBeforeRemoteEnrich(failures);
336373
}
337374
}
338375
}

0 commit comments

Comments
 (0)