Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/115624.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 115624
summary: "ES|QL: fix LIMIT pushdown past MV_EXPAND"
area: ES|QL
type: bug
issues:
- 102084
- 102061
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,83 @@ from employees | where emp_no == 10001 | keep * | mv_expand first_name;
avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean
268728049 | 1953-09-02T00:00:00.000Z | 10001 | Georgi | M | 2.03 | 2.0299999713897705 | 2.029296875 | 2.0300000000000002 | 1986-06-26T00:00:00.000Z | [false, true] | [Accountant, Senior Python Developer] | 2 | 2 | 2 | 2 | Facello | 57305 | 1.19 | 1 | 1.19 | 1 | true
;


// see https://github.com/elastic/elasticsearch/issues/102061
sortMvExpand
required_capability: add_limit_inside_mv_expand
row a = 1 | sort a | mv_expand a;

a:integer
1
;

// see https://github.com/elastic/elasticsearch/issues/102061
sortMvExpandFromIndex
required_capability: add_limit_inside_mv_expand
from employees | sort emp_no | mv_expand emp_no | limit 1 | keep emp_no;

emp_no:integer
10001
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMvExpand
required_capability: add_limit_inside_mv_expand
row a = 1 | limit 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMultipleMvExpand
required_capability: add_limit_inside_mv_expand
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
3 | 2 | 3
;


multipleLimitSortMultipleMvExpand
required_capability: add_limit_inside_mv_expand
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 2 | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


multipleLimitSortMultipleMvExpand2
required_capability: add_limit_inside_mv_expand
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 3 | mv_expand b | mv_expand c | limit 2;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


//see https://github.com/elastic/elasticsearch/issues/102084
whereMvExpand
required_capability: add_limit_inside_mv_expand
row a = 1, b = -15 | where b > 3 | mv_expand b;

a:integer | b:integer
;


//see https://github.com/elastic/elasticsearch/issues/102084
whereMvExpandOnIndex
required_capability: add_limit_inside_mv_expand
from employees | where emp_no == 10003 | mv_expand first_name | keep first_name;

first_name:keyword
Parto
;
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,12 @@ public enum Cap {
/**
* Support simplified syntax for named parameters for field and function names.
*/
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot());
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot()),

/**
* Fix pushdown of LIMIT past MV_EXPAND
*/
ADD_LIMIT_INSIDE_MV_EXPAND;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
resolved,
resolved.resolved()
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
: resolved
: resolved,
p.limit()
);
}
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineProjections;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConstantFolding;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConvertStringToByteRef;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.DuplicateLimitAfterMvExpand;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.FoldNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.LiteralsOnTheRight;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PartiallyFoldCase;
Expand Down Expand Up @@ -174,7 +173,6 @@ protected static Batch<LogicalPlan> operators() {
new PruneColumns(),
new PruneLiteralsInOrderBy(),
new PushDownAndCombineLimits(),
new DuplicateLimitAfterMvExpand(),
new PushDownAndCombineFilters(),
new PushDownEval(),
new PushDownRegexExtract(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ public LogicalPlan rule(Limit limit) {
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
return unary.replaceChild(limit.replaceChild(unary.child()));
} else if (unary instanceof MvExpand mvx) {
// MV_EXPAND can increase the number of rows, so we cannot just push the limit down
// (we also have to preserve the LIMIT afterwards)
//
// To avoid infinite loops, ie.
// | MV_EXPAND | LIMIT -> | LIMIT | MV_EXPAND | LIMIT -> ... | MV_EXPAND | LIMIT
// we add an inner limit to MvExpand and just push down the existing limit, ie.
// | MV_EXPAND | LIMIT N -> | LIMIT N | MV_EXPAND (with limit N)
var limitSource = limit.limit();
var limitVal = (int) limitSource.fold();
Integer mvxLimit = mvx.limit();
if (mvxLimit == null || mvxLimit > limitVal) {
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
}
return mvx.replaceChild(limit.replaceChild(mvx.child()));
}
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,28 @@ public class MvExpand extends UnaryPlan {

private final NamedExpression target;
private final Attribute expanded;
private final Integer limit;

private List<Attribute> output;

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
this(source, child, target, expanded, null);
}

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) {
super(source, child);
this.target = target;
this.expanded = expanded;
this.limit = limit;
}

private MvExpand(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(LogicalPlan.class),
in.readNamedWriteable(NamedExpression.class),
in.readNamedWriteable(Attribute.class)
in.readNamedWriteable(Attribute.class),
null // we only need this on the coordinator
);
}

Expand All @@ -51,6 +58,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(child());
out.writeNamedWriteable(target());
out.writeNamedWriteable(expanded());
assert limit == null;
}

@Override
Expand Down Expand Up @@ -78,6 +86,10 @@ public Attribute expanded() {
return expanded;
}

public Integer limit() {
return limit;
}

@Override
protected AttributeSet computeReferences() {
return target.references();
Expand All @@ -94,7 +106,7 @@ public boolean expressionsResolved() {

@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded);
return new MvExpand(source(), newChild, target, expanded, limit);
}

@Override
Expand All @@ -107,19 +119,20 @@ public List<Attribute> output() {

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target, expanded);
return Objects.hash(super.hashCode(), target, expanded, limit);
}

@Override
public boolean equals(Object obj) {
if (false == super.equals(obj)) {
return false;
}
return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded);
MvExpand other = ((MvExpand) obj);
return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded) && Objects.equals(limit, other.limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
Expand Down Expand Up @@ -228,7 +231,13 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
}

if (p instanceof MvExpand mvExpand) {
return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
if (mvExpand.limit() != null) {
// MvExpand could have an inner limit
// see PushDownAndCombineLimits rule
return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
}
return result;
}

//
Expand Down
Loading