Skip to content

Commit d1d1971

Browse files
committed
Refactor and make For a pipeline breaker
1 parent 98c334d commit d1d1971

File tree

2 files changed

+8
-23
lines changed

2 files changed

+8
-23
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import java.io.IOException;
4141
import java.util.ArrayList;
42+
import java.util.HashSet;
4243
import java.util.List;
4344
import java.util.Locale;
4445
import java.util.Map;
@@ -309,36 +310,20 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
309310
return;
310311
}
311312

312-
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
313-
// https://github.com/elastic/elasticsearch/issues/131445
314-
boolean[] aggregate = { false };
315-
boolean[] coordinatorOnlyEnrich = { false };
316-
boolean[] lookupJoin = { false };
317-
boolean[] fork = { false };
313+
Set<String> badCommands = new HashSet<>();
318314

319315
enrich.forEachUp(LogicalPlan.class, u -> {
320316
if (u instanceof Aggregate) {
321-
aggregate[0] = true;
317+
badCommands.add("STATS");
322318
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
323-
coordinatorOnlyEnrich[0] = true;
319+
badCommands.add("another ENRICH with coordinator policy");
324320
} else if (u instanceof LookupJoin) {
325-
lookupJoin[0] = true;
321+
badCommands.add("LOOKUP JOIN");
326322
} else if (u instanceof Fork) {
327-
fork[0] = true;
323+
badCommands.add("FORK");
328324
}
329325
});
330326

331-
if (aggregate[0]) {
332-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
333-
}
334-
if (coordinatorOnlyEnrich[0]) {
335-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
336-
}
337-
if (lookupJoin[0]) {
338-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
339-
}
340-
if (fork[0]) {
341-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after FORK"));
342-
}
327+
badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
343328
}
344329
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
3434
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
3535
*/
36-
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware {
36+
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker {
3737

3838
public static final String FORK_FIELD = "_fork";
3939
public static final int MAX_BRANCHES = 8;

0 commit comments

Comments
 (0)