Skip to content

Commit 345cc48

Browse files
authored
[9.1] Restrict remote ENRICH after FORK (#131945) (#132039)
* Restrict remote ENRICH after FORK (#131945) * Restrict remote LOOKUP JOIN after FORK (cherry picked from commit 24aefcc) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java * Not for backport
1 parent 8e51512 commit 345cc48

File tree

4 files changed

+24
-17
lines changed

4 files changed

+24
-17
lines changed

docs/changelog/131945.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131945
2+
summary: Restrict remote ENRICH after FORK
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 131445

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import java.io.IOException;
4141
import java.util.ArrayList;
42+
import java.util.HashSet;
4243
import java.util.List;
4344
import java.util.Locale;
4445
import java.util.Map;
@@ -309,30 +310,20 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
309310
return;
310311
}
311312

312-
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
313-
// https://github.com/elastic/elasticsearch/issues/131445
314-
boolean[] aggregate = { false };
315-
boolean[] coordinatorOnlyEnrich = { false };
316-
boolean[] lookupJoin = { false };
313+
Set<String> badCommands = new HashSet<>();
317314

318315
enrich.forEachUp(LogicalPlan.class, u -> {
319316
if (u instanceof Aggregate) {
320-
aggregate[0] = true;
317+
badCommands.add("STATS");
321318
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
322-
coordinatorOnlyEnrich[0] = true;
319+
badCommands.add("another ENRICH with coordinator policy");
323320
} else if (u instanceof LookupJoin) {
324-
lookupJoin[0] = true;
321+
badCommands.add("LOOKUP JOIN");
322+
} else if (u instanceof Fork) {
323+
badCommands.add("FORK");
325324
}
326325
});
327326

328-
if (aggregate[0]) {
329-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
330-
}
331-
if (coordinatorOnlyEnrich[0]) {
332-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
333-
}
334-
if (lookupJoin[0]) {
335-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
336-
}
327+
badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
337328
}
338329
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
120120
}
121121
final FragmentExec fragment = (FragmentExec) fragments.getFirst();
122122

123+
// Though FORK is technically a pipeline breaker, it should never show up here.
124+
// See also: https://github.com/elastic/elasticsearch/pull/131945/files#r2235572935
123125
final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
124126
if (pipelineBreakers.isEmpty()) {
125127
return null;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2357,6 +2357,14 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
23572357
| ENRICH _remote:languages ON language_code
23582358
""", analyzer);
23592359
assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
2360+
2361+
err = error("""
2362+
FROM test
2363+
| FORK (WHERE languages == 1) (WHERE languages == 2)
2364+
| EVAL language_code = languages
2365+
| ENRICH _remote:languages ON language_code
2366+
""", analyzer);
2367+
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after FORK"));
23602368
}
23612369

23622370
private void checkFullTextFunctionsInStats(String functionInvocation) {

0 commit comments

Comments
 (0)