-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ES|QL: fix LIMIT pushdown past MV_EXPAND #115624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
65096cb
280a4b5
f471d54
b03ef72
5b3fd2e
db86a6f
e171cdf
5755968
3b407e7
2322b04
7ee5811
8f1c2d7
adc4d81
af70d28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| pr: 115624 | ||
| summary: "ES|QL: fix LIMIT pushdown past MV_EXPAND" | ||
| area: ES|QL | ||
| type: bug | ||
| issues: | ||
| - 102084 | ||
| - 102061 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,14 @@ public LogicalPlan rule(Limit limit) { | |
| } else if (limit.child() instanceof UnaryPlan unary) { | ||
| if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) { | ||
| return unary.replaceChild(limit.replaceChild(unary.child())); | ||
| } else if (unary instanceof MvExpand mvx) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting that this code previously did not push the limit down past the MvExpand at all, but the bug reports indicate that that was happening. Was it happening somewhere else? If so, should that location be modified?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah, good point, I forgot about it... I think we can remove it
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is another rule, AddDefaultTopN, that is intrinsically wrong (it adds an arbitrary limit at the beginning of the query) and that IMHO should be removed as well, but I'm not sure this fix is enough to do it, maybe the problems it tries to solve are not only related to MV_EXPAND. |
||
| var limitSource = limit.limit(); | ||
| var limitVal = (int) limitSource.fold(); | ||
| Integer mvxLimit = mvx.limit(); | ||
| if (mvxLimit == null || mvxLimit < 0 || mvxLimit > limitVal) { | ||
|
||
| mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal); | ||
| } | ||
| return mvx.replaceChild(limit.replaceChild(mvx.child())); | ||
| } | ||
| // check if there's a 'visible' descendant limit lower than the current one | ||
| // and if so, align the current limit since it adds no value | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,21 +27,24 @@ public class MvExpand extends UnaryPlan { | |
|
|
||
| private final NamedExpression target; | ||
| private final Attribute expanded; | ||
| private final Integer limit; | ||
|
|
||
| private List<Attribute> output; | ||
|
|
||
| public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) { | ||
| public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the limit is "artificial" and not serialized, consider keeping the old constructor which delegates to the extended one and just passes limit as null. |
||
| super(source, child); | ||
| this.target = target; | ||
| this.expanded = expanded; | ||
| this.limit = limit; | ||
| } | ||
|
|
||
| private MvExpand(StreamInput in) throws IOException { | ||
| this( | ||
| Source.readFrom((PlanStreamInput) in), | ||
| in.readNamedWriteable(LogicalPlan.class), | ||
| in.readNamedWriteable(NamedExpression.class), | ||
| in.readNamedWriteable(Attribute.class) | ||
| in.readNamedWriteable(Attribute.class), | ||
| null // we only need this on the coordinator | ||
| ); | ||
| } | ||
|
|
||
|
|
@@ -51,6 +54,7 @@ public void writeTo(StreamOutput out) throws IOException { | |
| out.writeNamedWriteable(child()); | ||
| out.writeNamedWriteable(target()); | ||
| out.writeNamedWriteable(expanded()); | ||
| assert limit == null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this never triggers b/c there's currently always going to be a limit below MV_EXPAND, so it'll never be serialized in the fragment, right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It never happens because we do the serialization after the physical planning, and the mapper will resolve an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify, we don't need to serialize it now, but we could. It would be yet another transport version bump, and since it's a fix and it will be backported, I preferred to make the backport as smooth as possible
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I get it.
The fragment will contain the logical plan, the "original" one. So theoretically, we might need to serialise an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct, and it's enough to guarantee that limited MV_EXPAND will never make it into the data nodes.
I think you're right |
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -78,6 +82,10 @@ public Attribute expanded() { | |
| return expanded; | ||
| } | ||
|
|
||
| public Integer limit() { | ||
| return limit; | ||
| } | ||
|
|
||
| @Override | ||
| protected AttributeSet computeReferences() { | ||
| return target.references(); | ||
|
|
@@ -94,7 +102,7 @@ public boolean expressionsResolved() { | |
|
|
||
| @Override | ||
| public UnaryPlan replaceChild(LogicalPlan newChild) { | ||
| return new MvExpand(source(), newChild, target, expanded); | ||
| return new MvExpand(source(), newChild, target, expanded, limit); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -107,19 +115,21 @@ public List<Attribute> output() { | |
|
|
||
| @Override | ||
| protected NodeInfo<? extends LogicalPlan> info() { | ||
| return NodeInfo.create(this, MvExpand::new, child(), target, expanded); | ||
| return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(super.hashCode(), target, expanded); | ||
| return Objects.hash(super.hashCode(), target, expanded, limit); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object obj) { | ||
| if (false == super.equals(obj)) { | ||
| return false; | ||
| } | ||
| return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded); | ||
| return Objects.equals(target, ((MvExpand) obj).target) | ||
|
||
| && Objects.equals(expanded, ((MvExpand) obj).expanded) | ||
| && Objects.equals(limit, ((MvExpand) obj).limit); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,9 @@ | |
| import org.elasticsearch.compute.aggregation.AggregatorMode; | ||
| import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; | ||
| import org.elasticsearch.xpack.esql.core.expression.Attribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.Literal; | ||
| import org.elasticsearch.xpack.esql.core.tree.Source; | ||
| import org.elasticsearch.xpack.esql.core.type.DataType; | ||
| import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Aggregate; | ||
| import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; | ||
|
|
@@ -228,7 +231,11 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) { | |
| } | ||
|
|
||
| if (p instanceof MvExpand mvExpand) { | ||
| return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded()); | ||
| MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded()); | ||
craigtaverner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (mvExpand.limit() != null && mvExpand.limit() >= 0) { | ||
|
||
| return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER)); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| // | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Unless this PR fixes all issues around limit and mvExpand, add a PARTIAL_ prefix or describe what the PR does - ADD_LIMIT_INSIDE_MV_EXPAND.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it fully addresses the LIMIT pushdown problem, but I also think the name you are suggesting is more descriptive, I'll change it anyway