Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog/103080.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pr: 103080
summary: "ESQL: `mv_expand` default limit fix"
area: ES|QL
type: bug
issues:
- 101266
- 102084
- 102061
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.rule.ParameterizedRule;
import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.ql.rule.Rule;
Expand Down Expand Up @@ -651,7 +653,75 @@ public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) {
} else {
limit = context.configuration().resultTruncationMaxSize(); // user provided a limit: cap result entries to the max
}
return new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan);

Limit l = new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan);
return maybeAddDefaultLimitForMvExpand(l);
}

/**
* This adds the implicit limit to a plan that has a sort and no limit between EsRelation and the first MvExpand.
* To date, the only known use case that "needs" this is a query of the form
* from test
* | sort emp_no
* | mv_expand first_name
* | rename first_name AS x
* | where x LIKE "*a*"
* | limit 15
*
* or
*
* from test
* | sort emp_no
* | mv_expand first_name
* | sort first_name
* | limit 15
*
* LogicalPlanAnalyzer.PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter
* on the expanded values OR if there is no sort between "limit" and "mv_expand". In these two situations, to be able for
* "sort emp_no" to form a TopN, we need a limit. Since the "limit 15" in the query cannot be pushed down (otherwise, it will change
* the results that reach mv_expand command) we need some kind of value there. For now, this is the implicit limit.
* The second query above becomes:
*
* from test
* | sort emp_no
* | limit 10000
Copy link
Contributor

@luigidellaquila luigidellaquila Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this doesn't work: adding a limit there changes the query semantics by filtering out candidate records too early.
In this specific case, the | sort emp_no will be overridden by | sort first_name, so it can be completely removed and the query will become

from test
| mv_expand first_name
| sort first_name
| limit 15

In general, the problem is not that we cannot push LIMIT past MV_EXPAND; the following transformation is absolutelly valid:
| mv_expand x | limit 10 -> | limit 10 | mv_expand x | limit 10
and also the default limit is valid as long as it's higher than the original limit:
| mv_expand x | limit 10 -> | limit 10000 | mv_expand x | limit 10

The real problem here is that we cannot completely remove the limit 10 after mv_expand, so we end up trying to push it down it again in an infinite loop.
My biggest concern is that even if we find a way to avoid it now, we'll eventually add new rules to the planner and we'll hardly take this infinite loop into account, so it will eventually happen again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this doesn't work: adding a limit there changes the query semantics by filtering out candidate records too early.

Agree with you, but as the planner works now - where a TopN is needed - a limit is required at that specific place in the query. As I mentioned in the code comments for now I decided to use the implicit limit.

In this specific case, the | sort emp_no will be overridden by | sort first_name

What do you mean "overriden"? The sorts do not change. If you meant "the sort first_name anyway changes the order of the final documents, so it makes sort emp_no obsolete" then this is not true. sort emp_no followed by mv_expand and then sort first_name has to happen in this specific order for things to make sense.

cannot push LIMIT past MV_EXPAN

I think this is a misunderstanding on your part. The code in the optimizer doesn't push limit, but it makes a copy of limit and pushes the copy. The limit after mv_expand doesn't disappear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "overriden"?

I mean that the following operations do not depend on the input order (none of the existing commands do, apart from LIMIT), so the fact that you sort records by emp_no doesn't make any difference: data will be re-sorted by first_name afterwards anyway

The code in the optimizer doesn't push limit, but it makes a copy of limit and pushes the copy

conceptually there is no big difference, you are adding a new limit before mv_expand (that can be considered a pushdown) and you are also preserving the original limit.

* | mv_expand first_name
* | sort first_name
* | limit 15
*/
private Limit maybeAddDefaultLimitForMvExpand(Limit limit) {
LogicalPlan plan = limit.child();
MvExpand mvExpand = null;
UnaryPlan esRelationParent = null;
UnaryPlan orderByParent = null;

// basically, locate the closest to Lucene mv_expand and any potential sort
while (plan instanceof UnaryPlan unaryPlan) {
if (plan instanceof MvExpand mve) {
mvExpand = mve;
orderByParent = null;
} else if (plan instanceof OrderBy && mvExpand != null) {
orderByParent = esRelationParent;
}
plan = unaryPlan.child();
esRelationParent = unaryPlan;
}

// when these two are found, place the default limit before sort
if (mvExpand != null && orderByParent != null && plan instanceof EsRelation) {
var duplicateLimit = new Limit(limit.source(), limit.limit(), orderByParent.child());
return limit.replaceChild(propagateLimitUntilEsRelation(duplicateLimit, orderByParent, (UnaryPlan) limit.child()));
}

return limit;
}

private LogicalPlan propagateLimitUntilEsRelation(Limit duplicateLimit, UnaryPlan esRelationParent, UnaryPlan child) {
if (child == esRelationParent) {
return esRelationParent.replaceChild(duplicateLimit);
} else {
return child.replaceChild(propagateLimitUntilEsRelation(duplicateLimit, esRelationParent, (UnaryPlan) child.child()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SimplifyComparisonsArithmetics;
import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
Expand All @@ -62,7 +61,6 @@
import org.elasticsearch.xpack.ql.rule.ParameterizedRule;
import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.CollectionUtils;
Expand Down Expand Up @@ -155,10 +153,9 @@ protected static List<Batch<LogicalPlan>> rules() {
new PushDownAndCombineLimits(),
new ReplaceLimitAndSortAsTopN()
);
var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN());
var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized());

return asList(substitutions, operators, skip, cleanup, defaultTopN, label);
return asList(substitutions, operators, skip, cleanup, label);
}

// TODO: currently this rule only works for aggregate functions (AVG)
Expand Down Expand Up @@ -493,13 +490,16 @@ protected LogicalPlan rule(Limit limit) {
|| child instanceof Limit;

if (shouldSkip == false && child instanceof UnaryPlan unary) {
MvExpand mvExpand = descendantMvExpand(unary);
// in case unary is THE MvExpand, return it right away
MvExpand mvExpand = unary instanceof MvExpand mve ? mve : descendantMvExpand(unary);
if (mvExpand != null) {
Limit limitBeforeMvExpand = limitBeforeMvExpand(mvExpand);
// if there is no "appropriate" limit before mv_expand, then push down a copy of the one after it so that:
// - a possible TopN is properly built as low as possible in the tree (closed to Lucene)
// - the input of mv_expand is as small as possible before it is expanded (less rows to inflate and occupy memory)
if (limitBeforeMvExpand == null) {
// limitBeforeMvExpand > limit is a cheap way of not indefinetely copying the same limit past mv_expand
// ">" will enforce the limit of the mv_expand, limitting the results that come to mv_expand to bare minimum
if (limitBeforeMvExpand == null || (int) limitBeforeMvExpand.limit().fold() > (int) limit.limit().fold()) {
var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child());
return limit.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, unary));
}
Expand All @@ -511,7 +511,7 @@ protected LogicalPlan rule(Limit limit) {
private static MvExpand descendantMvExpand(UnaryPlan unary) {
UnaryPlan plan = unary;
AttributeSet filterReferences = new AttributeSet();
while (plan instanceof Aggregate == false) {
while (plan instanceof UnaryPlan) {
if (plan instanceof MvExpand mve) {
// don't return the mv_expand that has a filter after it which uses the expanded values
// since this will trigger the use of a potentially incorrect (too restrictive) limit further down in the tree
Expand All @@ -532,6 +532,8 @@ private static MvExpand descendantMvExpand(UnaryPlan unary) {
// something like from test | sort emp_no | mv_expand job_positions | sort first_name | limit 5
// (the sort first_name likely changes the order of the docs after sort emp_no, so "limit 5" shouldn't be copied down
return null;
} else if (plan instanceof Limit) {
return null;
}

if (plan.child() instanceof UnaryPlan unaryPlan) {
Expand All @@ -545,7 +547,7 @@ private static MvExpand descendantMvExpand(UnaryPlan unary) {

private static Limit limitBeforeMvExpand(MvExpand mvExpand) {
UnaryPlan plan = mvExpand;
while (plan instanceof Aggregate == false) {
while (plan instanceof UnaryPlan) {
if (plan instanceof Limit limit) {
return limit;
}
Expand Down Expand Up @@ -1016,40 +1018,6 @@ protected LogicalPlan rule(Limit plan) {
}
}

/**
* This adds an explicit TopN node to a plan that only has an OrderBy right before Lucene.
* To date, the only known use case that "needs" this is a query of the form
* from test
* | sort emp_no
* | mv_expand first_name
* | rename first_name AS x
* | where x LIKE "*a*"
* | limit 15
*
* or
*
* from test
* | sort emp_no
* | mv_expand first_name
* | sort first_name
* | limit 15
*
* PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
* OR if there is no sort between "limit" and "mv_expand".
* But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule.
*/
static class AddDefaultTopN extends ParameterizedOptimizerRule<LogicalPlan, LogicalOptimizerContext> {

@Override
protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) {
if (plan instanceof UnaryPlan unary && unary.child() instanceof OrderBy order && order.child() instanceof EsRelation relation) {
var limit = new Literal(Source.EMPTY, context.configuration().resultTruncationMaxSize(), DataTypes.INTEGER);
return unary.replaceChild(new TopN(plan.source(), relation, order.order(), limit));
}
return plan;
}
}

public static class ReplaceRegexMatch extends OptimizerRules.ReplaceRegexMatch {

protected Expression regexToEquals(RegexMatch<?> regexMatch, Literal literal) {
Expand Down
Loading