-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Replace remote Enrich hack with proper handling of remote Enrich #134967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a first look at the new optimizer rules and left some comments.
...rc/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java
Outdated
Show resolved
Hide resolved
Pinging @elastic/es-analytical-engine (Team:Analytics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done a first pass - need another one. I'm like halfway through it, but haven't fully thought through the added optimizer rules, yet.
I noticed that this doesn't add new optimizer tests, yet. That's something I think we should really add before this can be merged; I'd go and try really hard to find edge cases where the hoisting mechanism, the new local-only topn/limit etc. may break.
(The generative tests will also do that to some extent (let's make sure they actually create remote-only enrichs!), but it's good to have a human-made baseline.)
.../esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/OptimizerRules.java
Outdated
Show resolved
Hide resolved
); | ||
} | ||
|
||
protected static Batch<LogicalPlan> operators(boolean local) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Hey, we have 3 mechanisms now that convey whether a rule should be run on the coordinator, the data node, or behave differently depending on global/local: this boolean
parameter, the CoordinatorOnly
interface, and the method LocalLogicalPlanOptimizer#localOperators
which does some special filtering on its own.
I like the CoordinatorOnly
interface, but'd prefer we settle on 1 mechanism if possible.
Maybe rather than a CoordinatorOnly
interface, we should have a DifferentOnDataNode
interface (yes, the name is horrible, I didn't have much time to think :D ) with a method localVersion
(or so) that either gives the data node version of the optimizer rule (applies to CombineProjections
and PropagateEmptyRelation
) or null
(applies to all rules marked CoordinatorOnly
in this PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made LocalLogicalPlanOptimizer#localOperators
respect the CoordinatorOnly
marker, so that's the same thing now, except for rule that work on both sides, but differently (PropagateEmptyRelation
). I agree that it could be refined further but not sure the best way yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, I'd like to push a small refactoring that consolidates the 3 methods for adjusting rules to the local optimizer into one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 2c67a57. Feel free to revert if you don't like it, though :)
...sql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineLimitTopN.java
Outdated
Show resolved
Hide resolved
...sql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineLimitTopN.java
Outdated
Show resolved
Hide resolved
...gin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java
Outdated
Show resolved
Hide resolved
import java.util.Objects; | ||
|
||
public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker { | ||
public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker, ExecutesOn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little confusing now that this is PipelineBreaker
but can still execute on ANY
sometimes.
Out of scope, but maybe we'll want to reconcile the two interfaces later; PipelineBreaker
could be another enum variant; so we'd have "executes anywhere", "executes only on data nodes", "breaks the pipeline if we're not already on the coordinator", "runs only on the coordinator".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is, commands like LIMIT are "sometimes pipeline breakers" now, but we don't have an interface like that. It may be a good idea to think about how to refine that, but I don't think in this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you maybe attempt that in a follow-up, please? Specifically, trying to consolidate the ExecutesOn
and PipelineBreaker
interfaces.
I don't want us to spend too many cycles on this; but the two interfaces have significant overlap in their meaning and this looks like one of those concepts that we best simplify if possible, otherwise new devs that start hacking on ESQL will have a hard time understanding this.
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
Outdated
Show resolved
Hide resolved
...gin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TopNSerializationTests.java
Outdated
Show resolved
Hide resolved
return duplicateLimitAsFirstGrandchild(limit); | ||
// We use withLocal = false because if we have a remote join it will be forced into the fragment by the mapper anyway, | ||
// And the verifier checks that there are no non-synthetic limits before the join. | ||
// TODO: However, this means that the non-remote join will be always forced on the coordinator. We may want to revisit this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-remote join on the coordinator is fine and probably more efficient in many (most?) cases. To perform a lookup join, we reach out to the nodes holding a replica of the lookup index (single sharded); that doesn't seem like something where parallelizing it across nodes will make it faster - but it would definitely do more work overall.
// left-hand side, so adding a limit in there would lead to the right-hand side work on incomplete data. | ||
// To avoid repeating this infinitely, we have to set duplicated = true. | ||
return duplicateLimitAsFirstGrandchild(limit); | ||
// We use withLocal = false because if we have a remote join it will be forced into the fragment by the mapper anyway, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, couldn't we make withLocal
depend on whether this is a remote join or not?
The current remote lookup join is built on trust :) Trust, that our optimizer won't stuff a real pipeline breaker below it if it wasn't there before. Formally, we're violating this when we duplicate a limit past a join, because we know that the mapper will treat this as a local limit anyway. But until it hits a remote lookup join, the mapper will actually behave as if the upstream limit it already found is a pipeline breaker.
We could make this less hacky if the mapper knew that it doesn't have to treat the limit as pipeline breaker to begin with. In fact, this special casing for joins in the mapper might become obsolete if we set the correct local
value on the limit, no? Then, the next case should trigger automatically; and the special handling of duplicated limits would be fully contained in PushDownAndCombineLimits
.
@Override | ||
public void postOptimizationVerification(Failures failures) { | ||
if (this.mode == Mode.REMOTE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we declared Enrich to be PostOptimizationVerificationAware.CoordinatorOnly
, shouldn't this have become obsolete?
LogicalPlan p = plan; | ||
if (plan.child() instanceof OrderBy o) { | ||
p = new TopN(o.source(), o.child(), o.order(), plan.limit()); | ||
p = new TopN(o.source(), o.child(), o.order(), plan.limit(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Wouldn't it be more corect if this was
p = new TopN(o.source(), o.child(), o.order(), plan.limit(), false); | |
p = new TopN(o.source(), o.child(), o.order(), plan.limit(), plan.local() && o.local()); |
?
This rule does run on the coordinator and it currently declares any topn it creates as pipeline breaking. That's currently okay because we don't create local SORTs. But when reading this in isolation, I don't know that if I'm not already familiar with this code :)
Maybe let's make this either more correct on its own, or add a comment for future devs that stumble upon this.
import java.util.Objects; | ||
|
||
public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker { | ||
public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker, ExecutesOn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you maybe attempt that in a follow-up, please? Specifically, trying to consolidate the ExecutesOn
and PipelineBreaker
interfaces.
I don't want us to spend too many cycles on this; but the two interfaces have significant overlap in their meaning and this looks like one of those concepts that we best simplify if possible, otherwise new devs that start hacking on ESQL will have a hard time understanding this.
); | ||
} | ||
|
||
protected static Batch<LogicalPlan> operators(boolean local) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, I'd like to push a small refactoring that consolidates the 3 methods for adjusting rules to the local optimizer into one.
CardinalityPreserving
interface to mark nodes that preserve cardinalityCoordinatorOnly
interface)TODO: this does not fix the issue of local (not remote) Lookup Join always being forced on the coordinator by Limits, we may address this in the followup, this one is meaty enough as it is.
Closes #115897
Fixes #118531