Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131945.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131945
summary: Restrict remote ENRICH after FORK
area: ES|QL
type: bug
issues:
- 131445
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -309,30 +310,20 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
return;
}

// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
// https://github.com/elastic/elasticsearch/issues/131445
boolean[] aggregate = { false };
boolean[] coordinatorOnlyEnrich = { false };
boolean[] lookupJoin = { false };
Set<String> badCommands = new HashSet<>();

enrich.forEachUp(LogicalPlan.class, u -> {
if (u instanceof Aggregate) {
aggregate[0] = true;
badCommands.add("STATS");
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
coordinatorOnlyEnrich[0] = true;
badCommands.add("another ENRICH with coordinator policy");
} else if (u instanceof LookupJoin) {
lookupJoin[0] = true;
badCommands.add("LOOKUP JOIN");
} else if (u instanceof Fork) {
badCommands.add("FORK");
}
});

if (aggregate[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
if (coordinatorOnlyEnrich[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}
if (lookupJoin[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
}
badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
*/
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware {
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, pipeline breakers are also being used in node-/cluster-level reduction here; I don't think this applies to FORK, as FORK cannot be applied in multiple steps (yet?); each FORK branch seems to be hooked up to the same exchange sink on the coordinator. @ioanatia , please correct me if I'm wrong.

We may be introducing a subtle bug here, or maybe not; in any case, it indicates that the PipelineBreaker interface needs further refinement, because for validation purposes FORK definitely is a pipeline breaker, even if it can't be used for node-/cluster-level reduction.

Maybe we should just add a method PipelineBreaker#isReducer or similar? It could default to true and return false for Fork.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure adding PipelineBreaker has any effect at all.

PipelineBreaker is indeed used here - but I don't see in which case we would ever get to a point where a FragmentExec contains a logical plan that contains a Fork plan. Let me know if that's not the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming @ioanatia, that makes sense ! If we're never pulling FORKS out of fragments, then this change should be safe.

If we're marking Fork as PipelineBreaker, maybe we could at least add a comment here to remind future us that Fork's never going to show up here.


public static final String FORK_FIELD = "_fork";
public static final int MAX_BRANCHES = 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2457,6 +2457,14 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));

err = error("""
FROM test
| FORK (WHERE languages == 1) (WHERE languages == 2)
| EVAL language_code = languages
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after FORK"));
}

private void checkFullTextFunctionsInStats(String functionInvocation) {
Expand Down
Loading