Skip to content

Commit 6a5a562

Browse files
ES|QL: fix LIMIT pushdown past MV_EXPAND (elastic#115624) (elastic#115958)
1 parent dd6d334 commit 6a5a562

File tree

12 files changed

+378
-171
lines changed

12 files changed

+378
-171
lines changed

docs/changelog/115624.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 115624
2+
summary: "ES|QL: fix LIMIT pushdown past MV_EXPAND"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 102084
7+
- 102061

x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,83 @@ from employees | where emp_no == 10001 | keep * | mv_expand first_name;
324324
avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean
325325
268728049 | 1953-09-02T00:00:00.000Z | 10001 | Georgi | M | 2.03 | 2.0299999713897705 | 2.029296875 | 2.0300000000000002 | 1986-06-26T00:00:00.000Z | [false, true] | [Accountant, Senior Python Developer] | 2 | 2 | 2 | 2 | Facello | 57305 | 1.19 | 1 | 1.19 | 1 | true
326326
;
327+
328+
329+
// see https://github.com/elastic/elasticsearch/issues/102061
330+
sortMvExpand
331+
required_capability: add_limit_inside_mv_expand
332+
row a = 1 | sort a | mv_expand a;
333+
334+
a:integer
335+
1
336+
;
337+
338+
// see https://github.com/elastic/elasticsearch/issues/102061
339+
sortMvExpandFromIndex
340+
required_capability: add_limit_inside_mv_expand
341+
from employees | sort emp_no | mv_expand emp_no | limit 1 | keep emp_no;
342+
343+
emp_no:integer
344+
10001
345+
;
346+
347+
348+
// see https://github.com/elastic/elasticsearch/issues/102061
349+
limitSortMvExpand
350+
required_capability: add_limit_inside_mv_expand
351+
row a = 1 | limit 1 | sort a | mv_expand a;
352+
353+
a:integer
354+
1
355+
;
356+
357+
358+
// see https://github.com/elastic/elasticsearch/issues/102061
359+
limitSortMultipleMvExpand
360+
required_capability: add_limit_inside_mv_expand
361+
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | mv_expand b | mv_expand c | limit 3;
362+
363+
a:integer | b:integer | c:integer
364+
1 | 2 | 3
365+
2 | 2 | 3
366+
3 | 2 | 3
367+
;
368+
369+
370+
multipleLimitSortMultipleMvExpand
371+
required_capability: add_limit_inside_mv_expand
372+
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 2 | mv_expand b | mv_expand c | limit 3;
373+
374+
a:integer | b:integer | c:integer
375+
1 | 2 | 3
376+
2 | 2 | 3
377+
;
378+
379+
380+
multipleLimitSortMultipleMvExpand2
381+
required_capability: add_limit_inside_mv_expand
382+
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 3 | mv_expand b | mv_expand c | limit 2;
383+
384+
a:integer | b:integer | c:integer
385+
1 | 2 | 3
386+
2 | 2 | 3
387+
;
388+
389+
390+
//see https://github.com/elastic/elasticsearch/issues/102084
391+
whereMvExpand
392+
required_capability: add_limit_inside_mv_expand
393+
row a = 1, b = -15 | where b > 3 | mv_expand b;
394+
395+
a:integer | b:integer
396+
;
397+
398+
399+
//see https://github.com/elastic/elasticsearch/issues/102084
400+
whereMvExpandOnIndex
401+
required_capability: add_limit_inside_mv_expand
402+
from employees | where emp_no == 10003 | mv_expand first_name | keep first_name;
403+
404+
first_name:keyword
405+
Parto
406+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,12 @@ public enum Cap {
440440
/**
441441
* Support simplified syntax for named parameters for field and function names.
442442
*/
443-
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot());
443+
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot()),
444+
445+
/**
446+
* Fix pushdown of LIMIT past MV_EXPAND
447+
*/
448+
ADD_LIMIT_INSIDE_MV_EXPAND;
444449

445450
private final boolean enabled;
446451

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
507507
resolved,
508508
resolved.resolved()
509509
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
510-
: resolved
510+
: resolved,
511+
p.limit()
511512
);
512513
}
513514
return p;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineProjections;
2020
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConstantFolding;
2121
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConvertStringToByteRef;
22-
import org.elasticsearch.xpack.esql.optimizer.rules.logical.DuplicateLimitAfterMvExpand;
2322
import org.elasticsearch.xpack.esql.optimizer.rules.logical.FoldNull;
2423
import org.elasticsearch.xpack.esql.optimizer.rules.logical.LiteralsOnTheRight;
2524
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PartiallyFoldCase;
@@ -174,7 +173,6 @@ protected static Batch<LogicalPlan> operators() {
174173
new PruneColumns(),
175174
new PruneLiteralsInOrderBy(),
176175
new PushDownAndCombineLimits(),
177-
new DuplicateLimitAfterMvExpand(),
178176
new PushDownAndCombineFilters(),
179177
new PushDownEval(),
180178
new PushDownRegexExtract(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/DuplicateLimitAfterMvExpand.java

Lines changed: 0 additions & 108 deletions
This file was deleted.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ public LogicalPlan rule(Limit limit) {
3333
} else if (limit.child() instanceof UnaryPlan unary) {
3434
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
3535
return unary.replaceChild(limit.replaceChild(unary.child()));
36+
} else if (unary instanceof MvExpand mvx) {
37+
// MV_EXPAND can increase the number of rows, so we cannot just push the limit down
38+
// (we also have to preserve the LIMIT afterwards)
39+
//
40+
// To avoid infinite loops, ie.
41+
// | MV_EXPAND | LIMIT -> | LIMIT | MV_EXPAND | LIMIT -> ... | MV_EXPAND | LIMIT
42+
// we add an inner limit to MvExpand and just push down the existing limit, ie.
43+
// | MV_EXPAND | LIMIT N -> | LIMIT N | MV_EXPAND (with limit N)
44+
var limitSource = limit.limit();
45+
var limitVal = (int) limitSource.fold();
46+
Integer mvxLimit = mvx.limit();
47+
if (mvxLimit == null || mvxLimit > limitVal) {
48+
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
49+
}
50+
return mvx.replaceChild(limit.replaceChild(mvx.child()));
3651
}
3752
// check if there's a 'visible' descendant limit lower than the current one
3853
// and if so, align the current limit since it adds no value

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,28 @@ public class MvExpand extends UnaryPlan {
2727

2828
private final NamedExpression target;
2929
private final Attribute expanded;
30+
private final Integer limit;
3031

3132
private List<Attribute> output;
3233

3334
public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
35+
this(source, child, target, expanded, null);
36+
}
37+
38+
public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) {
3439
super(source, child);
3540
this.target = target;
3641
this.expanded = expanded;
42+
this.limit = limit;
3743
}
3844

3945
private MvExpand(StreamInput in) throws IOException {
4046
this(
4147
Source.readFrom((PlanStreamInput) in),
4248
in.readNamedWriteable(LogicalPlan.class),
4349
in.readNamedWriteable(NamedExpression.class),
44-
in.readNamedWriteable(Attribute.class)
50+
in.readNamedWriteable(Attribute.class),
51+
null // we only need this on the coordinator
4552
);
4653
}
4754

@@ -51,6 +58,7 @@ public void writeTo(StreamOutput out) throws IOException {
5158
out.writeNamedWriteable(child());
5259
out.writeNamedWriteable(target());
5360
out.writeNamedWriteable(expanded());
61+
assert limit == null;
5462
}
5563

5664
@Override
@@ -78,6 +86,10 @@ public Attribute expanded() {
7886
return expanded;
7987
}
8088

89+
public Integer limit() {
90+
return limit;
91+
}
92+
8193
@Override
8294
protected AttributeSet computeReferences() {
8395
return target.references();
@@ -94,7 +106,7 @@ public boolean expressionsResolved() {
94106

95107
@Override
96108
public UnaryPlan replaceChild(LogicalPlan newChild) {
97-
return new MvExpand(source(), newChild, target, expanded);
109+
return new MvExpand(source(), newChild, target, expanded, limit);
98110
}
99111

100112
@Override
@@ -107,19 +119,20 @@ public List<Attribute> output() {
107119

108120
@Override
109121
protected NodeInfo<? extends LogicalPlan> info() {
110-
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
122+
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
111123
}
112124

113125
@Override
114126
public int hashCode() {
115-
return Objects.hash(super.hashCode(), target, expanded);
127+
return Objects.hash(super.hashCode(), target, expanded, limit);
116128
}
117129

118130
@Override
119131
public boolean equals(Object obj) {
120132
if (false == super.equals(obj)) {
121133
return false;
122134
}
123-
return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded);
135+
MvExpand other = ((MvExpand) obj);
136+
return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded) && Objects.equals(limit, other.limit);
124137
}
125138
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import org.elasticsearch.compute.aggregation.AggregatorMode;
1212
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
14+
import org.elasticsearch.xpack.esql.core.expression.Literal;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
1417
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
1518
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1619
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
@@ -228,7 +231,13 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
228231
}
229232

230233
if (p instanceof MvExpand mvExpand) {
231-
return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
234+
MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
235+
if (mvExpand.limit() != null) {
236+
// MvExpand could have an inner limit
237+
// see PushDownAndCombineLimits rule
238+
return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
239+
}
240+
return result;
232241
}
233242

234243
//

0 commit comments

Comments
 (0)