-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ES|QL: fix LIMIT pushdown past MV_EXPAND #115624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ES|QL: fix LIMIT pushdown past MV_EXPAND #115624
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @luigidellaquila, I've created a changelog YAML for you. |
|
@elasticmachine update branch |
craigtaverner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a good solution, but I did have some comments.
| } else if (limit.child() instanceof UnaryPlan unary) { | ||
| if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) { | ||
| return unary.replaceChild(limit.replaceChild(unary.child())); | ||
| } else if (unary instanceof MvExpand mvx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that this code previously did not push the limit down past the MvExpand at all, but the bug reports indicate that that was happening. Was it happening somewhere else? If so, should that location be modified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, good point, I forgot about it...
We have another rule, called DuplicateLimitAfterMvExpand, that does exactly what I describe above:
MV_EXPAND | LIMIT -> LIMIT | MV_EXPAND | LIMIT
I think we can remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another rule, AddDefaultTopN, that is intrinsically wrong (it adds an arbitrary limit at the beginning of the query) and that IMHO should be removed as well, but I'm not sure this fix is enough to do it, maybe the problems it tries to solve are not only related to MV_EXPAND.
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java
Show resolved
Hide resolved
…_limit_2' into esql/fix_sort_mv_expand_limit_2
| || child instanceof Limit; | ||
|
|
||
| if (shouldSkip == false && child instanceof UnaryPlan unary) { | ||
| MvExpand mvExpand = descendantMvExpand(unary); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule seems to cover more cases than simply LIMIT->MvExpand, which the other rule now covers. Presumably the iterative application of the other rules will cover this scenario already, so this removal is of no consequence. And presumably test coverage will pick this up if that is not the case.
costin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An inner limit is going to create its own problems however the original issue hasn't been addressed in over a year (I felt short) and this PR looks to improve the situation.
See my other comments on incorporating the tests (disabled them if they fail) from the other PRs .
| /** | ||
| * Fix pushdown of LIMIT past MV_EXPAND | ||
| */ | ||
| FIX_MV_EXPAND_LIMIT_PUSHDOWN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Unless this PR fixes all issues around limit and mvExpand, add a PARTIAL_ prefix or describe what the PR does - ADD_LIMIT_INSIDE_MV_EXPAND.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it fully addresses the LIMIT pushdown problem, but I also think the name you are suggesting is more descriptive, I'll change it anyway
| return false; | ||
| } | ||
| return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded); | ||
| return Objects.equals(target, ((MvExpand) obj).target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract the MvExpand casting inside equals.
| private List<Attribute> output; | ||
|
|
||
| public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) { | ||
| public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the limit is "artificial" and not serialized, consider keeping the old constructor which delegates to the extended one and just passes limit as null.
Makes the change less disruptive especially in most cases looks like null is passed anyway.
Good point, I'm adding some of the tests from these two PRs. |
|
@elasticmachine update branch |
bpintea
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I
| if (p instanceof MvExpand mvExpand) { | ||
| return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded()); | ||
| MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded()); | ||
| if (mvExpand.limit() != null && mvExpand.limit() >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious about the >= 0 bit. Is it needed, do we do it anywhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure we don't support a LIMIT -1... probably I should remove that check.
| var limitSource = limit.limit(); | ||
| var limitVal = (int) limitSource.fold(); | ||
| Integer mvxLimit = mvx.limit(); | ||
| if (mvxLimit == null || mvxLimit < 0 || mvxLimit > limitVal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here: is the negative check needed?
| out.writeNamedWriteable(child()); | ||
| out.writeNamedWriteable(target()); | ||
| out.writeNamedWriteable(expanded()); | ||
| assert limit == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this never triggers b/c there's currently always going to be a limit below MV_EXPAND, so it'll never be serialized in the fragment, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It never happens because we do the serialization after the physical planning, and the mapper will resolve an MvExpand(limit=n) into a LimitExec(n) + MvExpandExec; after physical planning, even if that fragment remains the same, it will be re-translated to Limit(n) + MvExpand and serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, we don't need to serialize it now, but we could. It would be yet another transport version bump, and since it's a fix and it will be backported, I preferred to make the backport as smooth as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get it.
To clarify, we don't need to serialize it now, but we could.
The fragment will contain the logical plan, the "original" one. So theoretically, we might need to serialise an MvExpand with a non-null limit. In which case the assertion would trip. But MvExpand's limit would be non-null only if a limit was pushed down over it and since limit is a pipeline breaker, the plan will be latest broken down at the limit, so MvExpand will, in current form, never make it onto the nodes. Right?
The translation to physical and the mapping wouldn't then influence this, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and since limit is a pipeline breaker, the plan will be latest broken down at the limit, so MvExpand will, in current form, never make it onto the nodes. Right?
Correct, and it's enough to guarantee that limited MV_EXPAND will never make it into the data nodes.
The translation to physical and the mapping wouldn't then influence this, I guess.
I think you're right
bpintea
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm. I guess at some point we might want to review this, if we add multiple reducers, no-limit plans or similar, but now it's a good fix.
…_limit_2' into esql/fix_sort_mv_expand_limit_2
|
@elasticmachine update branch |
|
@elasticmachine update branch |
Fixes #102084
Fixes #102061
Allow proper pushdown of
LIMIT xcommand pastMV_EXPAND, using a two-step strategy:MV_EXPAND | LIMIT xintoLIMIT x | MV_EXPAND(with limit x)LimitExecAll this happens only at logical level, so that physical plan and operators can ignore it (mv_expand operator implementation is unbounded, this PR doesn't change it).
The inner limit is resolved locally:
LimitExec, so the physical plan doesn't have itphysical plan -> logical plan fragmenttranslation for data node routing, MV_EXPAND is created without inner limitso there is no need to serialize it (ie. no impact on bwc problems, no changes to TransportVersions)
Q: Why adding a LIMIT to MV_EXPAND and not trying to just reason on the plan? LIMIT is not strictly related to MV_EXPAND, so it seems out of context there.
A: For many reasons:
MV_EXPAND | LIMIT->LIMIT | MV_EXPAND | LIMIT->... | MV_EXPAND | LIMIT(it was previously #102545)