Skip to content

Commit efbca9b

Browse files
committed
Add optimisation rule to pull OrderBy above InlineJoin
1 parent ccfbf49 commit efbca9b

File tree

5 files changed

+369
-9
lines changed

5 files changed

+369
-9
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
//
2-
// TODO: re-enable the commented tests once the Join functionality stabilizes
3-
//
4-
51
allFieldsReturned
62
required_capability: inlinestats_v9
73

@@ -881,7 +877,6 @@ emp_no:integer | languages:integer | gender:keyword | max_lang:integer | y:keywo
881877
10014 | 5 | null | 5 | null
882878
;
883879

884-
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
885880
groupByMultipleRenamedColumns_AndOneExpression_Last
886881
required_capability: inlinestats_v9
887882

@@ -905,7 +900,6 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:
905900
10010 |4 |null |Duangkaew |4 |null |4 |D
906901
;
907902

908-
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
909903
groupByMultipleRenamedColumns_AndTwoExpressions
910904
required_capability: inlinestats_v9
911905

@@ -929,7 +923,6 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:
929923
10010 |4 |null |Duangkaew |4 |D |null |D |4
930924
;
931925

932-
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
933926
groupByMultipleRenamedColumns_AndMultipleRenames
934927
required_capability: inlinestats_v9
935928

@@ -954,7 +947,6 @@ emp_no:integer | languages:integer | gender:keyword| f:keyword |max_lang:
954947
10010 |4 |null |Duangkaew |4 |null |4 |D
955948
;
956949

957-
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
958950
groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride
959951
required_capability: inlinestats_v9
960952

@@ -1477,3 +1469,101 @@ ROW salary = 12300, emp_no = 5, gender = "F"
14771469
emp_no:integer
14781470
5
14791471
;
1472+
1473+
sortBeforeInlinestats1
1474+
required_capability: inlinestats_v9
1475+
1476+
ROW salary = 12300, emp_no = 5, gender = "F"
1477+
| EVAL salaryK = salary/1000
1478+
| SORT salaryK DESC
1479+
| INLINESTATS sum = SUM(salaryK) BY gender
1480+
| KEEP emp_no
1481+
;
1482+
1483+
emp_no:integer
1484+
5
1485+
;
1486+
1487+
sortBeforeInlinestats2
1488+
required_capability: inlinestats_v9
1489+
1490+
FROM employees
1491+
| SORT emp_no
1492+
| EVAL salaryK = salary/1000
1493+
| INLINESTATS count = COUNT(*) BY salaryK
1494+
| KEEP emp_no, still_hired, count
1495+
| LIMIT 5
1496+
;
1497+
1498+
emp_no:integer |still_hired:boolean|count:long
1499+
10001 |true |1
1500+
10002 |true |3
1501+
10003 |false |2
1502+
10004 |true |2
1503+
10005 |true |1
1504+
;
1505+
1506+
// fails with: java.lang.AssertionError: expected no concrete indices without data node plan
1507+
sortBeforeInlinestats3-Ignore
1508+
required_capability: inlinestats_v9
1509+
1510+
FROM employees
1511+
| SORT languages DESC
1512+
| EVAL salaryK = salary/1000
1513+
| INLINESTATS count = COUNT(*) BY salaryK
1514+
| SORT emp_no
1515+
| INLINESTATS min = MIN(MV_COUNT(languages)) BY salaryK
1516+
| KEEP emp_no, still_hired, count
1517+
| LIMIT 5
1518+
;
1519+
1520+
emp_no:integer |still_hired:boolean|count:long
1521+
10001 |true |1
1522+
10002 |true |3
1523+
10003 |false |2
1524+
10004 |true |2
1525+
10005 |true |1
1526+
;
1527+
1528+
// same as `afterLookup`, swapped SORT position
1529+
sortBeforeInlinestatsAndLookupJoin
1530+
required_capability: inlinestats_v9
1531+
required_capability: join_lookup_v12
1532+
1533+
FROM airports
1534+
| EVAL backup_scalerank = scalerank
1535+
| RENAME scalerank AS language_code
1536+
| SORT abbrev DESC
1537+
| LOOKUP JOIN languages_lookup ON language_code
1538+
| RENAME language_name as scalerank
1539+
| DROP language_code
1540+
| INLINESTATS count=COUNT(*) BY scalerank
1541+
| KEEP abbrev, *scalerank
1542+
| LIMIT 5
1543+
;
1544+
1545+
abbrev:keyword |backup_scalerank:integer| scalerank:keyword
1546+
null |8 |null
1547+
null |8 |null
1548+
null |8 |null
1549+
ZRH |3 |Spanish
1550+
ZNZ |4 |German
1551+
;
1552+
1553+
// same as `shadowingAggregateByNextGrouping`, swapped SORT position
1554+
sortBeforeInlinestats
1555+
required_capability: inlinestats_v9
1556+
1557+
FROM employees
1558+
| KEEP gender, languages, emp_no, salary
1559+
| SORT emp_no
1560+
| INLINESTATS gender = count_distinct(gender) BY languages
1561+
| INLINESTATS avg(salary) BY gender
1562+
| LIMIT 3
1563+
;
1564+
1565+
emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:long
1566+
10001 |57305 |2 |48248.55 |2
1567+
10002 |56371 |5 |48248.55 |2
1568+
10003 |61805 |4 |48248.55 |2
1569+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantOrderBy;
3535
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantSortClauses;
3636
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneUnusedIndexMode;
37+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PullUpOrderByBeforeInlineJoin;
3738
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters;
3839
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits;
3940
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineOrderBy;
@@ -203,6 +204,7 @@ protected static Batch<LogicalPlan> operators(boolean local) {
203204
new PushDownAndCombineOrderBy(),
204205
new PruneRedundantOrderBy(),
205206
new PruneRedundantSortClauses(),
207+
new PullUpOrderByBeforeInlineJoin(),
206208
new PruneLeftJoinOnNullMatchingField()
207209
);
208210
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
8+
9+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
10+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
11+
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
12+
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
13+
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
14+
15+
/**
16+
* Pulls "up" an {@link OrderBy} node that is not preceded by a {@link Limit}, but is preceded by an {@link InlineJoin}.
17+
* The InlineJoin is {@link SortAgnostic}, so the OrderBy can be pulled up without affecting the semantics of the join.
18+
* This is needed since otherwise the OrderBy would remain to be executed unbounded, which isn't supported.
19+
* If it's preceded by a {@link Limit}, it will be merged into a {@link org.elasticsearch.xpack.esql.plan.logical.TopN} later in the
20+
* "cleanup" optimization stage.
21+
*/
22+
public final class PullUpOrderByBeforeInlineJoin extends OptimizerRules.OptimizerRule<LogicalPlan> {
23+
24+
@Override
25+
protected LogicalPlan rule(LogicalPlan plan) {
26+
return plan.transformUp(LogicalPlan.class, PullUpOrderByBeforeInlineJoin::pullUpOrderByBeforeInlineJoin);
27+
}
28+
29+
private static LogicalPlan pullUpOrderByBeforeInlineJoin(LogicalPlan plan) {
30+
if (plan instanceof InlineJoin inlineJoin) {
31+
OrderBy orderBy = findOrderByNotPrecededByLimit(inlineJoin);
32+
if (orderBy != null) {
33+
LogicalPlan newInlineJoin = removeOrderBy(inlineJoin, orderBy);
34+
return new OrderBy(orderBy.source(), newInlineJoin, orderBy.order());
35+
}
36+
}
37+
return plan;
38+
}
39+
40+
// Finds an OrderBy node in the subtree of the provided plan that is not preceded by a Limit
41+
private static OrderBy findOrderByNotPrecededByLimit(LogicalPlan plan) {
42+
if (plan instanceof Limit) {
43+
return null;
44+
}
45+
if (plan instanceof OrderBy orderBy) {
46+
return orderBy;
47+
}
48+
for (LogicalPlan child : plan.children()) {
49+
if (child instanceof SortAgnostic) {
50+
OrderBy found = findOrderByNotPrecededByLimit(child);
51+
if (found != null) {
52+
return found;
53+
}
54+
}
55+
}
56+
return null;
57+
}
58+
59+
// Removes the found OrderBy node from its current position in the subtree
60+
private static LogicalPlan removeOrderBy(LogicalPlan plan, OrderBy orderBy) {
61+
if (plan == orderBy) {
62+
return orderBy.child();
63+
}
64+
if (plan.children().isEmpty()) {
65+
return plan;
66+
}
67+
return plan.replaceChildren(plan.children().stream().map(child -> removeOrderBy(child, orderBy)).toList());
68+
}
69+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Sample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.io.IOException;
1919
import java.util.Objects;
2020

21-
public class Sample extends UnaryPlan implements TelemetryAware {
21+
public class Sample extends UnaryPlan implements SortAgnostic, TelemetryAware {
2222
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Sample", Sample::new);
2323

2424
private final Expression probability;

0 commit comments

Comments
 (0)