Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/115624.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 115624
summary: "ES|QL: fix LIMIT pushdown past MV_EXPAND"
area: ES|QL
type: bug
issues:
- 102084
- 102061
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,64 @@ from employees | where emp_no == 10001 | keep * | mv_expand first_name;
avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean
268728049 | 1953-09-02T00:00:00.000Z | 10001 | Georgi | M | 2.03 | 2.0299999713897705 | 2.029296875 | 2.0300000000000002 | 1986-06-26T00:00:00.000Z | [false, true] | [Accountant, Senior Python Developer] | 2 | 2 | 2 | 2 | Facello | 57305 | 1.19 | 1 | 1.19 | 1 | true
;


// see https://github.com/elastic/elasticsearch/issues/102061
sortMvExpand
required_capability: add_limit_inside_mv_expand
row a = 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMvExpand
required_capability: add_limit_inside_mv_expand
row a = 1 | limit 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMultipleMvExpand
required_capability: add_limit_inside_mv_expand
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
3 | 2 | 3
;


multipleLimitSortMultipleMvExpand
required_capability: add_limit_inside_mv_expand
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 2 | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


multipleLimitSortMultipleMvExpand2
required_capability: add_limit_inside_mv_expand
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 3 | mv_expand b | mv_expand c | limit 2;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


//see https://github.com/elastic/elasticsearch/issues/102084
whereMvExpand
required_capability: add_limit_inside_mv_expand
row a = 1, b = -15 | where b > 3 | mv_expand b;

a:integer | b:integer
;
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,12 @@ public enum Cap {
/**
* Support simplified syntax for named parameters for field and function names.
*/
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot());
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot()),

/**
* Fix pushdown of LIMIT past MV_EXPAND
*/
ADD_LIMIT_INSIDE_MV_EXPAND;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
resolved,
resolved.resolved()
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
: resolved
: resolved,
p.limit()
);
}
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineProjections;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConstantFolding;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConvertStringToByteRef;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.DuplicateLimitAfterMvExpand;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.FoldNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.LiteralsOnTheRight;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PartiallyFoldCase;
Expand Down Expand Up @@ -174,7 +173,6 @@ protected static Batch<LogicalPlan> operators() {
new PruneColumns(),
new PruneLiteralsInOrderBy(),
new PushDownAndCombineLimits(),
new DuplicateLimitAfterMvExpand(),
new PushDownAndCombineFilters(),
new PushDownEval(),
new PushDownRegexExtract(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ public LogicalPlan rule(Limit limit) {
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
return unary.replaceChild(limit.replaceChild(unary.child()));
} else if (unary instanceof MvExpand mvx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that this code previously did not push the limit down past the MvExpand at all, but the bug reports indicate that that was happening. Was it happening somewhere else? If so, should that location be modified?

Copy link
Contributor Author

@luigidellaquila luigidellaquila Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, good point, I forgot about it...
We have another rule, called DuplicateLimitAfterMvExpand, that does exactly what I describe above:
MV_EXPAND | LIMIT -> LIMIT | MV_EXPAND | LIMIT

I think we can remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another rule, AddDefaultTopN, that is intrinsically wrong (it adds an arbitrary limit at the beginning of the query) and that IMHO should be removed as well, but I'm not sure this fix is enough to do it, maybe the problems it tries to solve are not only related to MV_EXPAND.

// MV_EXPAND can increase the number of rows, so we cannot just push the limit down
// (we also have to preserve the LIMIT afterwards)
//
// To avoid infinite loops, ie.
// | MV_EXPAND | LIMIT -> | LIMIT | MV_EXPAND | LIMIT -> ... | MV_EXPAND | LIMIT
// we add an inner limit to MvExpand and just push down the existing limit, ie.
// | MV_EXPAND | LIMIT N -> | LIMIT N | MV_EXPAND (with limit N)
var limitSource = limit.limit();
var limitVal = (int) limitSource.fold();
Integer mvxLimit = mvx.limit();
if (mvxLimit == null || mvxLimit < 0 || mvxLimit > limitVal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: is the negative check needed?

mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
}
return mvx.replaceChild(limit.replaceChild(mvx.child()));
}
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,28 @@ public class MvExpand extends UnaryPlan {

private final NamedExpression target;
private final Attribute expanded;
private final Integer limit;

private List<Attribute> output;

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
this(source, child, target, expanded, null);
}

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the limit is "artificial" and not serialized, consider keeping the old constructor which delegates to the extended one and just passes limit as null.
Makes the change less disruptive especially in most cases looks like null is passed anyway.

super(source, child);
this.target = target;
this.expanded = expanded;
this.limit = limit;
}

private MvExpand(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(LogicalPlan.class),
in.readNamedWriteable(NamedExpression.class),
in.readNamedWriteable(Attribute.class)
in.readNamedWriteable(Attribute.class),
null // we only need this on the coordinator
);
}

Expand All @@ -51,6 +58,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(child());
out.writeNamedWriteable(target());
out.writeNamedWriteable(expanded());
assert limit == null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this never triggers b/c there's currently always going to be a limit below MV_EXPAND, so it'll never be serialized in the fragment, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It never happens because we do the serialization after the physical planning, and the mapper will resolve an MvExpand(limit=n) into a LimitExec(n) + MvExpandExec; after physical planning, even if that fragment remains the same, it will be re-translated to Limit(n) + MvExpand and serialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, we don't need to serialize it now, but we could. It would be yet another transport version bump, and since it's a fix and it will be backported, I preferred to make the backport as smooth as possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it.

To clarify, we don't need to serialize it now, but we could.

The fragment will contain the logical plan, the "original" one. So theoretically, we might need to serialise an MvExpand with a non-null limit. In which case the assertion would trip. But MvExpand's limit would be non-null only if a limit was pushed down over it and since limit is a pipeline breaker, the plan will be latest broken down at the limit, so MvExpand will, in current form, never make it onto the nodes. Right?
The translation to physical and the mapping wouldn't then influence this, I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and since limit is a pipeline breaker, the plan will be latest broken down at the limit, so MvExpand will, in current form, never make it onto the nodes. Right?

Correct, and it's enough to guarantee that limited MV_EXPAND will never make it into the data nodes.

The translation to physical and the mapping wouldn't then influence this, I guess.

I think you're right

}

@Override
Expand Down Expand Up @@ -78,6 +86,10 @@ public Attribute expanded() {
return expanded;
}

public Integer limit() {
return limit;
}

@Override
protected AttributeSet computeReferences() {
return target.references();
Expand All @@ -94,7 +106,7 @@ public boolean expressionsResolved() {

@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded);
return new MvExpand(source(), newChild, target, expanded, limit);
}

@Override
Expand All @@ -107,19 +119,20 @@ public List<Attribute> output() {

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target, expanded);
return Objects.hash(super.hashCode(), target, expanded, limit);
}

@Override
public boolean equals(Object obj) {
if (false == super.equals(obj)) {
return false;
}
return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded);
MvExpand other = ((MvExpand) obj);
return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded) && Objects.equals(limit, other.limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
Expand Down Expand Up @@ -228,7 +231,13 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
}

if (p instanceof MvExpand mvExpand) {
return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
if (mvExpand.limit() != null && mvExpand.limit() >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the >= 0 bit. Is it needed, do we do it anywhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we don't support a LIMIT -1... probably I should remove that check.

// MvExpand could have an inner limit
// see PushDownAndCombineLimits rule
return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
}
return result;
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -193,11 +194,10 @@ public void testMissingFieldInSort() {

/**
* Expects
* EsqlProject[[first_name{f}#6]]
* \_Limit[1000[INTEGER]]
* \_MvExpand[last_name{f}#9,last_name{r}#15]
* \_Limit[1000[INTEGER]]
* \_EsRelation[test][_meta_field{f}#11, emp_no{f}#5, first_name{f}#6, ge..]
* EsqlProject[[first_name{f}#9, last_name{r}#18]]
* \_MvExpand[last_name{f}#12,last_name{r}#18,1000]
* \_Limit[1000[INTEGER]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
*/
public void testMissingFieldInMvExpand() {
var plan = plan("""
Expand All @@ -213,11 +213,8 @@ public void testMissingFieldInMvExpand() {
var projections = project.projections();
assertThat(Expressions.names(projections), contains("first_name", "last_name"));

var limit = as(project.child(), Limit.class);
// MvExpand cannot be optimized (yet) because the target NamedExpression cannot be replaced with a NULL literal
// https://github.com/elastic/elasticsearch/issues/109974
// See LocalLogicalPlanOptimizer.ReplaceMissingFieldWithNull
var mvExpand = as(limit.child(), MvExpand.class);
var mvExpand = as(project.child(), MvExpand.class);
assertThat(mvExpand.limit(), equalTo(1000));
var limit2 = as(mvExpand.child(), Limit.class);
as(limit2.child(), EsRelation.class);
}
Expand Down
Loading