Skip to content

Commit 843f1b8

Browse files
authored
ESQL: Fix LOOKUP JOIN with limit (#120411)
For queries like ... | LOOKUP JOIN lookup_index ON key | LIMIT 10 the limit cannot be simply pushed past the join - but it can be duplicated past the join. In such cases, leave an explicit Limit plan node downstream from the Join (in addition to pushing down the limit), but mark it in a way that prevents being duplicated multiple times (which would cause infinite loops). Align the logic for MV_EXPAND, which used to, instead, internalize a limit into the MvExpand node.
1 parent 5bcd170 commit 843f1b8

File tree

14 files changed

+707
-223
lines changed

14 files changed

+707
-223
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.xpack.esql.parser.QueryParam;
6767
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
6868
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
69+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
6970
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
7071
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
7172
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
@@ -111,6 +112,7 @@
111112
import static java.util.Collections.emptyList;
112113
import static java.util.Collections.emptyMap;
113114
import static java.util.Collections.unmodifiableMap;
115+
import static org.elasticsearch.test.ESTestCase.assertEquals;
114116
import static org.elasticsearch.test.ESTestCase.between;
115117
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
116118
import static org.elasticsearch.test.ESTestCase.randomBoolean;
@@ -403,6 +405,21 @@ public static <T> T as(Object node, Class<T> type) {
403405
return type.cast(node);
404406
}
405407

408+
public static Limit asLimit(Object node, Integer limitLiteral) {
409+
return asLimit(node, limitLiteral, null);
410+
}
411+
412+
public static Limit asLimit(Object node, Integer limitLiteral, Boolean duplicated) {
413+
Limit limit = as(node, Limit.class);
414+
if (limitLiteral != null) {
415+
assertEquals(as(limit.limit(), Literal.class).value(), limitLiteral);
416+
}
417+
if (duplicated != null) {
418+
assertEquals(limit.duplicated(), duplicated);
419+
}
420+
return limit;
421+
}
422+
406423
public static Map<String, EsField> loadMapping(String name) {
407424
return LoadMapping.loadMapping(name);
408425
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,58 @@ emp_no:integer
273273
10001
274274
;
275275

276+
277+
lookupIndexInFromRepeatedRowBug
278+
// Test for https://github.com/elastic/elasticsearch/issues/118852
279+
required_capability: join_lookup_v12
280+
FROM languages_lookup_non_unique_key
281+
| WHERE language_code == 1
282+
| LOOKUP JOIN languages_lookup ON language_code
283+
| KEEP language_code, language_name, country
284+
| SORT language_code, language_name, country
285+
;
286+
287+
language_code:integer | language_name:keyword | country:text
288+
1 | English | Canada
289+
1 | English | United Kingdom
290+
1 | English | United States of America
291+
1 | English | null
292+
;
293+
294+
nonUniqueRightKeyOnTheCoordinatorLateLimit
295+
required_capability: join_lookup_v12
296+
required_capability: join_lookup_fix_limit_pushdown
297+
298+
FROM employees
299+
| SORT emp_no
300+
| EVAL language_code = emp_no % 10
301+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
302+
| KEEP emp_no, language_code, language_name, country
303+
| LIMIT 4
304+
| SORT country
305+
;
306+
307+
emp_no:integer | language_code:integer | language_name:keyword | country:text
308+
10001 | 1 | English | Canada
309+
10001 | 1 | null | United Kingdom
310+
10001 | 1 | English | United States of America
311+
10001 | 1 | English | null
312+
;
313+
314+
nonUniqueRightKeyLateLimitWithEmptyRelation
315+
required_capability: join_lookup_v12
316+
required_capability: join_lookup_fix_limit_pushdown
317+
318+
ROW language_code = 1
319+
| WHERE language_code != 1
320+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
321+
| LIMIT 1
322+
| KEEP language_code, language_name
323+
;
324+
325+
language_code:integer | language_name:keyword
326+
;
327+
276328
###########################################################################
277329
# null and multi-value behavior with languages_lookup_non_unique_key index
278330
###########################################################################
@@ -1278,23 +1330,6 @@ ignoreOrder:true
12781330
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA | null
12791331
;
12801332

1281-
lookupIndexInFromRepeatedRowBug
1282-
// Test for https://github.com/elastic/elasticsearch/issues/118852
1283-
required_capability: join_lookup_v12
1284-
FROM languages_lookup_non_unique_key
1285-
| WHERE language_code == 1
1286-
| LOOKUP JOIN languages_lookup ON language_code
1287-
| KEEP language_code, language_name, country
1288-
| SORT language_code, language_name, country
1289-
;
1290-
1291-
language_code:integer | language_name:keyword | country:text
1292-
1 | English | Canada
1293-
1 | English | United Kingdom
1294-
1 | English | United States of America
1295-
1 | English | null
1296-
;
1297-
12981333
lookupIndexQuoting
12991334
required_capability: join_lookup_v12
13001335
FROM languages_lookup_non_unique_key

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,11 @@ public enum Cap {
711711
*/
712712
JOIN_LOOKUP_SKIP_MV_ON_LOOKUP_KEY(JOIN_LOOKUP_V12.isEnabled()),
713713

714+
/**
715+
* Fix pushing down LIMIT past LOOKUP JOIN in case of multiple matching join keys.
716+
*/
717+
JOIN_LOOKUP_FIX_LIMIT_PUSHDOWN(JOIN_LOOKUP_V12.isEnabled()),
718+
714719
/**
715720
* Fix for https://github.com/elastic/elasticsearch/issues/117054
716721
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,7 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
549549
resolved,
550550
resolved.resolved()
551551
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
552-
: resolved,
553-
p.limit()
552+
: resolved
554553
);
555554
}
556555
return p;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/AddDefaultTopN.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* | sort first_name
3535
* | limit 15
3636
* <p>
37-
* PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
37+
* {@link PushDownAndCombineLimits} will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
3838
* OR if there is no sort between "limit" and "mv_expand".
3939
* But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule.
4040
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
99

10-
import org.elasticsearch.xpack.esql.core.expression.Literal;
1110
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1211
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1312
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -21,6 +20,9 @@
2120
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2221
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
2322

23+
import java.util.ArrayList;
24+
import java.util.List;
25+
2426
public final class PushDownAndCombineLimits extends OptimizerRules.ParameterizedOptimizerRule<Limit, LogicalOptimizerContext> {
2527

2628
public PushDownAndCombineLimits() {
@@ -31,27 +33,18 @@ public PushDownAndCombineLimits() {
3133
public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
3234
if (limit.child() instanceof Limit childLimit) {
3335
var limitSource = limit.limit();
34-
var l1 = (int) limitSource.fold(ctx.foldCtx());
35-
var l2 = (int) childLimit.limit().fold(ctx.foldCtx());
36-
return new Limit(limit.source(), Literal.of(limitSource, Math.min(l1, l2)), childLimit.child());
36+
var parentLimitValue = (int) limitSource.fold(ctx.foldCtx());
37+
var childLimitValue = (int) childLimit.limit().fold(ctx.foldCtx());
38+
// We want to preserve the duplicated() value of the smaller limit, so we'll use replaceChild.
39+
return parentLimitValue < childLimitValue ? limit.replaceChild(childLimit.child()) : childLimit;
3740
} else if (limit.child() instanceof UnaryPlan unary) {
3841
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
3942
return unary.replaceChild(limit.replaceChild(unary.child()));
40-
} else if (unary instanceof MvExpand mvx) {
43+
} else if (unary instanceof MvExpand) {
4144
// MV_EXPAND can increase the number of rows, so we cannot just push the limit down
4245
// (we also have to preserve the LIMIT afterwards)
43-
//
44-
// To avoid infinite loops, ie.
45-
// | MV_EXPAND | LIMIT -> | LIMIT | MV_EXPAND | LIMIT -> ... | MV_EXPAND | LIMIT
46-
// we add an inner limit to MvExpand and just push down the existing limit, ie.
47-
// | MV_EXPAND | LIMIT N -> | LIMIT N | MV_EXPAND (with limit N)
48-
var limitSource = limit.limit();
49-
var limitVal = (int) limitSource.fold(ctx.foldCtx());
50-
Integer mvxLimit = mvx.limit();
51-
if (mvxLimit == null || mvxLimit > limitVal) {
52-
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
53-
}
54-
return mvx.replaceChild(limit.replaceChild(mvx.child()));
46+
// To avoid repeating this infinitely, we have to set duplicated = true.
47+
return duplicateLimitAsFirstGrandchild(limit);
5548
}
5649
// check if there's a 'visible' descendant limit lower than the current one
5750
// and if so, align the current limit since it adds no value
@@ -62,17 +55,15 @@ public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
6255
var l1 = (int) limit.limit().fold(ctx.foldCtx());
6356
var l2 = (int) descendantLimit.limit().fold(ctx.foldCtx());
6457
if (l2 <= l1) {
65-
return new Limit(limit.source(), Literal.of(limit.limit(), l2), limit.child());
58+
return limit.withLimit(descendantLimit.limit());
6659
}
6760
}
6861
}
69-
} else if (limit.child() instanceof Join join) {
70-
if (join.config().type() == JoinTypes.LEFT) {
71-
// NOTE! This is only correct because our LEFT JOINs preserve the number of rows from the left hand side.
72-
// This deviates from SQL semantics. In SQL, multiple matches on the right hand side lead to multiple rows in the output.
73-
// For us, multiple matches on the right hand side are collected into multi-values.
74-
return join.replaceChildren(limit.replaceChild(join.left()), join.right());
75-
}
62+
} else if (limit.child() instanceof Join join && join.config().type() == JoinTypes.LEFT) {
63+
// Left joins increase the number of rows if any join key has multiple matches from the right hand side.
64+
// Therefore, we cannot simply push down the limit - but we can add another limit before the join.
65+
// To avoid repeating this infinitely, we have to set duplicated = true.
66+
return duplicateLimitAsFirstGrandchild(limit);
7667
}
7768
return limit;
7869
}
@@ -100,4 +91,27 @@ private static Limit descendantLimit(UnaryPlan unary) {
10091
}
10192
return null;
10293
}
94+
95+
/**
96+
* Duplicate the limit past its child if it wasn't duplicated yet. The duplicate is placed on top of its leftmost grandchild.
97+
* Idempotent. (Sets {@link Limit#duplicated()} to {@code true} on the limit that remains at the top.)
98+
*/
99+
private static Limit duplicateLimitAsFirstGrandchild(Limit limit) {
100+
if (limit.duplicated()) {
101+
return limit;
102+
}
103+
104+
List<LogicalPlan> grandChildren = limit.child().children();
105+
LogicalPlan firstGrandChild = grandChildren.getFirst();
106+
LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild);
107+
108+
List<LogicalPlan> newGrandChildren = new ArrayList<>();
109+
newGrandChildren.add(newFirstGrandChild);
110+
for (int i = 1; i < grandChildren.size(); i++) {
111+
newGrandChildren.add(grandChildren.get(i));
112+
}
113+
114+
LogicalPlan newChild = limit.child().replaceChildren(newGrandChildren);
115+
return limit.replaceChild(newChild).withDuplicated(true);
116+
}
103117
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,52 @@ public class Limit extends UnaryPlan {
2121
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new);
2222

2323
private final Expression limit;
24-
24+
/**
25+
* Important for optimizations. This should be {@code false} in most cases, which allows this instance to be duplicated past a child
26+
* plan node that increases the number of rows, like for LOOKUP JOIN and MV_EXPAND.
27+
* Needs to be set to {@code true} in {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits} to avoid
28+
* infinite loops from adding a duplicate of the limit past the child over and over again.
29+
*/
30+
private final transient boolean duplicated;
31+
32+
/**
33+
* Default way to create a new instance. Do not use this to copy an existing instance, as this sets {@link Limit#duplicated} to
34+
* {@code false}.
35+
*/
2536
public Limit(Source source, Expression limit, LogicalPlan child) {
37+
this(source, limit, child, false);
38+
}
39+
40+
public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated) {
2641
super(source, child);
2742
this.limit = limit;
43+
this.duplicated = duplicated;
2844
}
2945

46+
/**
47+
* Omits reading {@link Limit#duplicated}, c.f. {@link Limit#writeTo}.
48+
*/
3049
private Limit(StreamInput in) throws IOException {
31-
this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(LogicalPlan.class));
50+
this(
51+
Source.readFrom((PlanStreamInput) in),
52+
in.readNamedWriteable(Expression.class),
53+
in.readNamedWriteable(LogicalPlan.class),
54+
false
55+
);
3256
}
3357

58+
/**
59+
* Omits serializing {@link Limit#duplicated} because when sent to a data node, this should always be {@code false}.
60+
* That's because if it's true, this means a copy of this limit was pushed down below an MvExpand or Join, and thus there's
61+
* another pipeline breaker further upstream - we're already on the coordinator node.
62+
*/
3463
@Override
3564
public void writeTo(StreamOutput out) throws IOException {
3665
Source.EMPTY.writeTo(out);
3766
out.writeNamedWriteable(limit());
3867
out.writeNamedWriteable(child());
68+
// Let's make sure we notice during tests if we ever serialize a duplicated Limit.
69+
assert duplicated == false;
3970
}
4071

4172
@Override
@@ -45,18 +76,30 @@ public String getWriteableName() {
4576

4677
@Override
4778
protected NodeInfo<Limit> info() {
48-
return NodeInfo.create(this, Limit::new, limit, child());
79+
return NodeInfo.create(this, Limit::new, limit, child(), duplicated);
4980
}
5081

5182
@Override
5283
public Limit replaceChild(LogicalPlan newChild) {
53-
return new Limit(source(), limit, newChild);
84+
return new Limit(source(), limit, newChild, duplicated);
5485
}
5586

5687
public Expression limit() {
5788
return limit;
5889
}
5990

91+
public Limit withLimit(Expression limit) {
92+
return new Limit(source(), limit, child(), duplicated);
93+
}
94+
95+
public boolean duplicated() {
96+
return duplicated;
97+
}
98+
99+
public Limit withDuplicated(boolean duplicated) {
100+
return new Limit(source(), limit, child(), duplicated);
101+
}
102+
60103
@Override
61104
public String commandName() {
62105
return "LIMIT";
@@ -69,7 +112,7 @@ public boolean expressionsResolved() {
69112

70113
@Override
71114
public int hashCode() {
72-
return Objects.hash(limit, child());
115+
return Objects.hash(limit, child(), duplicated);
73116
}
74117

75118
@Override
@@ -83,6 +126,6 @@ public boolean equals(Object obj) {
83126

84127
Limit other = (Limit) obj;
85128

86-
return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child());
129+
return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child()) && (duplicated == other.duplicated);
87130
}
88131
}

0 commit comments

Comments
 (0)