-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Disallow remote enrich after lu join #131426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7dd0088
ac18cfb
eb03085
11d3eb3
81463ac
444b8f3
c0e72e1
ade4ffe
ca65578
aeebd76
7ac0248
47dca64
19e77f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pr: 131426 | ||
summary: Disallow remote enrich after lu join | ||
area: ES|QL | ||
type: bug | ||
issues: | ||
- 129372 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.elasticsearch.xpack.esql.index.EsIndex; | ||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; | ||
import org.elasticsearch.xpack.esql.plan.GeneratingPlan; | ||
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
|
@@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() { | |
* retaining the originating cluster and restructing pages for routing, which might be complicated. | ||
*/ | ||
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) { | ||
boolean[] agg = { false }; | ||
boolean[] enrichCoord = { false }; | ||
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be | ||
// in separate FORK branches which are valid by themselves. | ||
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures)); | ||
} | ||
|
||
/** | ||
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream. | ||
*/ | ||
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) { | ||
if (enrich.mode != Mode.REMOTE) { | ||
return; | ||
} | ||
|
||
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not completely sure, but if FORK needs merging on the coordinator, then we definitely want it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since FORK is 9.1+ though, it may be easier to take care of it in a separate pull. And maybe even not mention FORK if we're merging it down to 8.18, otherwise somebody reading 8.18 branch code could be quite confused. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// https://github.com/elastic/elasticsearch/issues/131445 | ||
boolean[] aggregate = { false }; | ||
boolean[] coordinatorOnlyEnrich = { false }; | ||
boolean[] lookupJoin = { false }; | ||
|
||
plan.forEachUp(UnaryPlan.class, u -> { | ||
enrich.forEachUp(LogicalPlan.class, u -> { | ||
if (u instanceof Aggregate) { | ||
agg[0] = true; | ||
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { | ||
enrichCoord[0] = true; | ||
} | ||
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { | ||
if (agg[0]) { | ||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS")); | ||
} | ||
if (enrichCoord[0]) { | ||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); | ||
} | ||
aggregate[0] = true; | ||
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) { | ||
coordinatorOnlyEnrich[0] = true; | ||
} else if (u instanceof LookupJoin) { | ||
lookupJoin[0] = true; | ||
} | ||
}); | ||
|
||
if (aggregate[0]) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we add new commands (FORK?) to this list, maybe we can start considering the creation of a flag interface to identify them (similar to current There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think having an interface that identifies the "not remotable" components would be nice, however it may be tricky because some operations may depend on options (like Enrich itself which can be "coordinator" or "remote" or "any"). Still, it might save some effort for simpler operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS")); | ||
} | ||
if (coordinatorOnlyEnrich[0]) { | ||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); | ||
} | ||
if (lookupJoin[0]) { | ||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN")); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.