Skip to content

Commit 1605c8f

Browse files
committed
Add optimization to purge join on null merge key
This adds a new logical optimization rule to purge a Join in case the merge key(s) are null. The null detection is based on recognizing a tree pattern where the join sits atop a project and/or eval which contains a reference to a null, reference which matches the join key. It works at coordinator planning level, but it's most useful locally, after insertions of nulls in the plan on detecting missing fields.
1 parent 4494fdc commit 1605c8f

File tree

6 files changed

+226
-3
lines changed

6 files changed

+226
-3
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Alias.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public String toString() {
138138

139139
@Override
140140
public String nodeString() {
141-
return child.nodeString() + " AS " + name();
141+
return child.nodeString() + " AS " + name() + "#" + id();
142142
}
143143

144144
/**

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,3 +1650,22 @@ event_duration:long
16501650
2764889
16511651
3450233
16521652
;
1653+
1654+
nullifiedJoinKeyToPurgeTheJoin
1655+
required_capability: join_lookup_v12
1656+
1657+
FROM employees
1658+
| RENAME languages AS language_code
1659+
| SORT language_code
1660+
| LIMIT 4
1661+
| EVAL language_code = TO_INTEGER(NULL)
1662+
| LOOKUP JOIN languages_lookup ON language_code
1663+
| KEEP emp_no, language_code, language_name
1664+
;
1665+
1666+
emp_no:integer | language_code:integer | language_name:keyword
1667+
10009 |null |null
1668+
10013 |null |null
1669+
10019 |null |null
1670+
10005 |null |null
1671+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateExpressions;
6666
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
6767
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate;
68+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneJoinOnNullMatchingField;
6869
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
6970
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
7071
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
@@ -201,7 +202,8 @@ protected static Batch<LogicalPlan> operators() {
201202
new PushDownEnrich(),
202203
new PushDownAndCombineOrderBy(),
203204
new PruneRedundantOrderBy(),
204-
new PruneRedundantSortClauses()
205+
new PruneRedundantSortClauses(),
206+
new PruneJoinOnNullMatchingField()
205207
);
206208
}
207209

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.Alias;
11+
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
12+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
13+
import org.elasticsearch.xpack.esql.core.expression.Expression;
14+
import org.elasticsearch.xpack.esql.core.expression.Expressions;
15+
import org.elasticsearch.xpack.esql.core.expression.Literal;
16+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
17+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
18+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
19+
import org.elasticsearch.xpack.esql.plan.logical.Project;
20+
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
21+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
27+
28+
/**
29+
* The rule matches a plan pattern having a Join on top of a Project and/or Eval. It then checks if the join's performed on a field which
30+
* is aliased to null (in type or value); if that's the case, it prunes the join, replacing it with an Eval - returning aliases to null
31+
* for all the fields added in by the right side of the Join - plus a Project on top of it.
32+
* The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is inserted
33+
* due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceMissingFieldWithNull}.
34+
*/
35+
public class PruneJoinOnNullMatchingField extends OptimizerRules.OptimizerRule<Join> {
36+
37+
@Override
38+
protected LogicalPlan rule(Join join) {
39+
AttributeMap.Builder<Expression> attributeMapBuilder = AttributeMap.builder();
40+
loop: for (var child = join.left();; child = ((UnaryPlan) child).child()) { // cast is safe as both Project and Eval are UnaryPlans
41+
switch (child) {
42+
case Project project -> project.projections().forEach(projection -> {
43+
if (projection instanceof Alias alias) {
44+
attributeMapBuilder.put(alias.toAttribute(), alias.child());
45+
}
46+
});
47+
case Eval eval -> eval.fields().forEach(alias -> attributeMapBuilder.put(alias.toAttribute(), alias.child()));
48+
default -> {
49+
break loop;
50+
}
51+
}
52+
}
53+
for (var attr : AttributeSet.of(join.config().matchFields())) {
54+
var resolved = attributeMapBuilder.build().resolve(attr);
55+
if (resolved != null && isGuaranteedNull(resolved)) {
56+
return replaceJoin(join);
57+
}
58+
}
59+
return join;
60+
}
61+
62+
private static LogicalPlan replaceJoin(Join join) {
63+
var joinRightOutput = join.rightOutputFields();
64+
if (joinRightOutput.isEmpty()) { // can be empty when the join key is null and the other right side entries pruned (by an agg)
65+
return join.left();
66+
}
67+
List<Alias> aliases = new ArrayList<>(joinRightOutput.size());
68+
// TODO: cache aliases by type, à la ReplaceMissingFieldWithNull#missingToNull (tho lookup indices won't have Ks of fields)
69+
joinRightOutput.forEach(a -> aliases.add(new Alias(a.source(), a.name(), Literal.of(a, null), a.id())));
70+
var eval = new Eval(join.source(), join.left(), aliases);
71+
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliases)));
72+
}
73+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
7070
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
7171
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
72+
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
7273
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
7374
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
7475
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
@@ -111,6 +112,7 @@
111112
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
112113
import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext;
113114
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
115+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
114116
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
115117
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
116118
import static org.hamcrest.Matchers.contains;
@@ -204,7 +206,14 @@ private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichRes
204206
IndexResolution getIndexResult = IndexResolution.valid(test);
205207

206208
return new Analyzer(
207-
new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, enrichResolution, emptyInferenceResolution()),
209+
new AnalyzerContext(
210+
config,
211+
new EsqlFunctionRegistry(),
212+
getIndexResult,
213+
defaultLookupResolution(),
214+
enrichResolution,
215+
emptyInferenceResolution()
216+
),
208217
new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
209218
);
210219
}
@@ -1377,6 +1386,82 @@ public void testMissingFieldsDoNotGetExtracted() {
13771386
assertThat(Expressions.names(fields), contains("_meta_field", "gender", "hire_date", "job", "job.raw", "languages", "long_noidx"));
13781387
}
13791388

1389+
/*
1390+
* LimitExec[1000[INTEGER]]
1391+
* \_AggregateExec[[language_code{r}#6],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, language_code{r}#6],FINAL,[language_code{r}#6, $
1392+
* $c$count{r}#25, $$c$seen{r}#26],12]
1393+
* \_ExchangeExec[[language_code{r}#6, $$c$count{r}#25, $$c$seen{r}#26],true]
1394+
* \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, languages{r}#15 AS language_code#6],INITIAL,[langua
1395+
* ges{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
1396+
* \_FieldExtractExec[emp_no{f}#12]<[],[]>
1397+
* \_EvalExec[[null[INTEGER] AS languages#15]]
1398+
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
1399+
*/
1400+
public void testMissingFieldsPurgesTheJoinLocally() {
1401+
var stats = EsqlTestUtils.statsForMissingField("languages");
1402+
1403+
var plan = plannerOptimizer.plan("""
1404+
from test
1405+
| keep emp_no, languages
1406+
| rename languages AS language_code
1407+
| lookup join languages_lookup ON language_code
1408+
| stats c = count(emp_no) by language_code
1409+
""", stats);
1410+
1411+
var limit = as(plan, LimitExec.class);
1412+
var agg = as(limit.child(), AggregateExec.class);
1413+
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
1414+
1415+
var exchange = as(agg.child(), ExchangeExec.class);
1416+
agg = as(exchange.child(), AggregateExec.class);
1417+
var extract = as(agg.child(), FieldExtractExec.class);
1418+
var eval = as(extract.child(), EvalExec.class);
1419+
var source = as(eval.child(), EsQueryExec.class);
1420+
}
1421+
1422+
/*
1423+
* LimitExec[1000[INTEGER]]
1424+
* \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]
1425+
* |_LimitExec[1000[INTEGER]]
1426+
* | \_AggregateExec[[languages{f}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{f}#15 AS language_code#6],FINAL,[language
1427+
* s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62]
1428+
* | \_ExchangeExec[[languages{f}#15, $$c$count{r}#25, $$c$seen{r}#26],true]
1429+
* | \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{r}#15 AS language_code#6],INITIAL,
1430+
* [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
1431+
* | \_FieldExtractExec[emp_no{f}#12]<[],[]>
1432+
* | \_EvalExec[[null[INTEGER] AS languages#15]]
1433+
* | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
1434+
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#30], limit[], sort[] estimatedRowSize[4]
1435+
*/
1436+
public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() {
1437+
var stats = EsqlTestUtils.statsForMissingField("languages");
1438+
1439+
// same as the query above, but with the last two lines swapped, so that the join is no longer pushed to the data nodes
1440+
var plan = plannerOptimizer.plan("""
1441+
from test
1442+
| keep emp_no, languages
1443+
| rename languages AS language_code
1444+
| stats c = count(emp_no) by language_code
1445+
| lookup join languages_lookup ON language_code
1446+
""", stats);
1447+
1448+
var limit = as(plan, LimitExec.class);
1449+
var join = as(limit.child(), LookupJoinExec.class);
1450+
limit = as(join.left(), LimitExec.class);
1451+
var agg = as(limit.child(), AggregateExec.class);
1452+
var exchange = as(agg.child(), ExchangeExec.class);
1453+
agg = as(exchange.child(), AggregateExec.class);
1454+
var extract = as(agg.child(), FieldExtractExec.class);
1455+
var eval = as(extract.child(), EvalExec.class);
1456+
var source = as(eval.child(), EsQueryExec.class);
1457+
assertThat(source.indexPattern(), is("test"));
1458+
assertThat(source.indexMode(), is(IndexMode.STANDARD));
1459+
1460+
source = as(join.right(), EsQueryExec.class);
1461+
assertThat(source.indexPattern(), is("languages_lookup"));
1462+
assertThat(source.indexMode(), is(IndexMode.LOOKUP));
1463+
}
1464+
13801465
/*
13811466
Checks that match filters are pushed down to Lucene when using no casting, for example:
13821467
WHERE first_name:"Anna")

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2782,6 +2782,50 @@ public void testDescendantLimitLookupJoin() {
27822782
var localRelation = as(limitBefore.child(), LocalRelation.class);
27832783
}
27842784

2785+
/*
2786+
* EsqlProject[[emp_no{f}#9, first_name{f}#10, languages{f}#12, language_code{r}#3, language_name{r}#22]]
2787+
* \_Eval[[null[INTEGER] AS language_code#3, null[KEYWORD] AS language_name#22]]
2788+
* \_Limit[1000[INTEGER],false]
2789+
* \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
2790+
*/
2791+
public void testPruneJoinOnNullMatchingField() {
2792+
var plan = optimizedPlan("""
2793+
from test
2794+
| eval language_code = null::integer
2795+
| keep emp_no, first_name, languages, language_code
2796+
| lookup join languages_lookup on language_code
2797+
""");
2798+
2799+
var project = as(plan, Project.class);
2800+
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "languages", "language_code", "language_name"));
2801+
var eval = as(project.child(), Eval.class);
2802+
var limit = asLimit(eval.child(), 1000, false);
2803+
var source = as(limit.child(), EsRelation.class);
2804+
}
2805+
2806+
/*
2807+
* EsqlProject[[emp_no{f}#15, first_name{f}#16, my_null{r}#3 AS language_code#9, language_name{r}#27]]
2808+
* \_Eval[[null[INTEGER] AS my_null#3, null[KEYWORD] AS language_name#27]]
2809+
* \_Limit[1000[INTEGER],false]
2810+
* \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
2811+
*/
2812+
public void testPruneJoinOnNullAssignedMatchingField() {
2813+
var plan = optimizedPlan("""
2814+
from test
2815+
| eval my_null = null::integer
2816+
| rename languages as language_code
2817+
| eval language_code = my_null
2818+
| lookup join languages_lookup on language_code
2819+
| keep emp_no, first_name, language_code, language_name
2820+
""");
2821+
2822+
var project = as(plan, EsqlProject.class);
2823+
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "language_code", "language_name"));
2824+
var eval = as(project.child(), Eval.class);
2825+
var limit = asLimit(eval.child(), 1000, false);
2826+
var source = as(limit.child(), EsRelation.class);
2827+
}
2828+
27852829
private static List<String> orderNames(TopN topN) {
27862830
return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList();
27872831
}

0 commit comments

Comments
 (0)