Skip to content

Commit 809dab1

Browse files
authored
ESQL: Pushdown Lookup Join past Project (elastic#129503)
Add a new logical plan optimization: When there is a Project (KEEP/DROP/RENAME/renaming EVALs) in a LOOKUP JOIN's left child (the "main" side), perform the Project after the LOOKUP JOIN. This prevents premature field extractions when the lookup join happens on data nodes.
1 parent 2502a36 commit 809dab1

File tree

17 files changed

+933
-399
lines changed

17 files changed

+933
-399
lines changed

docs/changelog/129503.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 129503
2+
summary: Pushdown Lookup Join past Project
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 119082

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,66 @@ emp_no:integer | language_code:integer | language_name:keyword
144144
10030 |0 | null
145145
;
146146

147+
multipleLookupsAndProjects
148+
required_capability: join_lookup_v12
149+
150+
FROM employees
151+
| KEEP languages, emp_no
152+
| EVAL language_code = languages
153+
| LOOKUP JOIN languages_lookup ON language_code
154+
| RENAME language_name AS foo
155+
| LOOKUP JOIN languages_lookup ON language_code
156+
| DROP foo
157+
| SORT emp_no
158+
| LIMIT 5
159+
;
160+
161+
languages:integer | emp_no:integer | language_code:integer | language_name:keyword
162+
2 | 10001 | 2 | French
163+
5 | 10002 | 5 | null
164+
4 | 10003 | 4 | German
165+
5 | 10004 | 5 | null
166+
1 | 10005 | 1 | English
167+
;
168+
169+
multipleLookupsAndProjectsNonUnique
170+
required_capability: join_lookup_v12
171+
172+
FROM employees
173+
| KEEP languages, emp_no
174+
| EVAL language_code = languages
175+
# this lookup contributes 0 fields in the end, but it contributes rows via multiple matches
176+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
177+
| RENAME language_name AS foo
178+
| LOOKUP JOIN languages_lookup ON language_code
179+
| DROP foo, country, country.keyword
180+
| SORT emp_no
181+
| LIMIT 5
182+
;
183+
184+
languages:integer | emp_no:integer | language_code:integer | language_name:keyword
185+
2 | 10001 | 2 | French
186+
2 | 10001 | 2 | French
187+
2 | 10001 | 2 | French
188+
5 | 10002 | 5 | null
189+
4 | 10003 | 4 | German
190+
;
191+
192+
projectsWithShadowingAfterPushdown
193+
required_capability: join_lookup_v12
194+
195+
FROM languages_lookup
196+
| WHERE language_code == 4
197+
# not shadowed - unless the rename would happen after the lookup
198+
| RENAME language_name AS original_language_name
199+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
200+
| DROP country*
201+
;
202+
203+
language_code:integer | original_language_name:keyword | language_name:keyword
204+
4 | German | Quenya
205+
;
206+
147207
###########################################################################
148208
# multiple match behavior with languages_lookup_non_unique_key index
149209
###########################################################################

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEnrich;
4242
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEval;
4343
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownInferencePlan;
44+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownJoinPastProject;
4445
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownRegexExtract;
4546
import org.elasticsearch.xpack.esql.optimizer.rules.logical.RemoveStatsOverride;
4647
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAggregateAggExpressionWithEval;
@@ -196,6 +197,7 @@ protected static Batch<LogicalPlan> operators() {
196197
new PushDownEval(),
197198
new PushDownRegexExtract(),
198199
new PushDownEnrich(),
200+
new PushDownJoinPastProject(),
199201
new PushDownAndCombineOrderBy(),
200202
new PruneRedundantOrderBy(),
201203
new PruneRedundantSortClauses(),
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.Alias;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
13+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
14+
import org.elasticsearch.xpack.esql.core.expression.Expression;
15+
import org.elasticsearch.xpack.esql.core.expression.Expressions;
16+
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
17+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
18+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
19+
import org.elasticsearch.xpack.esql.plan.logical.Project;
20+
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
21+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
22+
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
23+
24+
import java.util.ArrayList;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Set;
28+
29+
/**
30+
* If a {@link Project} is found in the left child of a left {@link Join}, perform it after. Due to requiring the projected attributes
31+
* later, field extractions can also happen later, making joins cheaper to execute on data nodes.
32+
* E.g. {@code ... | RENAME field AS otherfield | LOOKUP JOIN lu_idx ON key}
33+
* becomes {@code ... | LOOKUP JOIN lu_idx ON key | RENAME field AS otherfield }.
34+
* When a {@code LOOKUP JOIN}'s lookup fields shadow the previous fields, we need to leave an {@link Eval} in place of the {@link Project}
35+
* to assign a temporary name. Assume that {@code field} is a lookup-added field, then
36+
* {@code ... | RENAME field AS otherfield | LOOKUP JOIN lu_idx ON key}
37+
* becomes something like
38+
* {@code ... | EVAL $$field = field | LOOKUP JOIN lu_idx ON key | RENAME $$field AS otherfield}.
39+
* Leaving {@code EVAL $$field = field} in place of the original projection, rather than a Project, avoids infinite loops.
40+
*/
41+
public final class PushDownJoinPastProject extends OptimizerRules.OptimizerRule<Join> {
42+
@Override
43+
protected LogicalPlan rule(Join join) {
44+
if (join instanceof InlineJoin) {
45+
// Do not apply to INLINESTATS; this rule could be expanded to include INLINESTATS, but the StubRelation refers to the left
46+
// child - so pulling out a Project from the left child would require us to also update the StubRelation (and the Aggregate
47+
// on top of it)
48+
// TODO: figure out how to push down in case of INLINESTATS
49+
return join;
50+
}
51+
52+
if (join.left() instanceof Project project && join.config().type() == JoinTypes.LEFT) {
53+
AttributeMap.Builder<Expression> aliasBuilder = AttributeMap.builder();
54+
project.forEachExpression(Alias.class, a -> aliasBuilder.put(a.toAttribute(), a.child()));
55+
var aliasesFromProject = aliasBuilder.build();
56+
57+
// Propagate any renames into the Join, as we will remove the upstream Project.
58+
// E.g. `RENAME field AS key | LOOKUP JOIN idx ON key` -> `LOOKUP JOIN idx ON field | ...`
59+
Join updatedJoin = PushDownUtils.resolveRenamesFromMap(join, aliasesFromProject);
60+
61+
// Construct the expressions for the new downstream Project using the Join's output.
62+
// We need to carry over RENAMEs/aliases from the original upstream Project.
63+
List<Attribute> originalOutput = join.output();
64+
List<NamedExpression> newProjections = new ArrayList<>(originalOutput.size());
65+
for (Attribute attr : originalOutput) {
66+
Attribute resolved = (Attribute) aliasesFromProject.resolve(attr, attr);
67+
if (attr.semanticEquals(resolved)) {
68+
newProjections.add(attr);
69+
} else {
70+
Alias renamed = new Alias(attr.source(), attr.name(), resolved, attr.id(), attr.synthetic());
71+
newProjections.add(renamed);
72+
}
73+
}
74+
75+
// This doesn't deal with name conflicts yet. Any name shadowed by a lookup field from the `LOOKUP JOIN` could still have been
76+
// used in the original Project; any such conflict needs to be resolved by copying the attribute under a temporary name via an
77+
// Eval - and using the attribute from said Eval in the new downstream Project.
78+
Set<String> lookupFieldNames = new HashSet<>(Expressions.names(join.rightOutputFields()));
79+
List<NamedExpression> finalProjections = new ArrayList<>(newProjections.size());
80+
AttributeMap.Builder<Alias> aliasesForReplacedAttributesBuilder = AttributeMap.builder();
81+
AttributeSet leftOutput = project.child().outputSet();
82+
83+
for (NamedExpression newProj : newProjections) {
84+
Attribute coreAttr = (Attribute) Alias.unwrap(newProj);
85+
// Only fields from the left need to be protected from conflicts - because fields from the right shadow them.
86+
if (leftOutput.contains(coreAttr) && lookupFieldNames.contains(coreAttr.name())) {
87+
// Conflict - the core attribute will be shadowed by the `LOOKUP JOIN` and we need to alias it in an upstream Eval.
88+
Alias renaming = aliasesForReplacedAttributesBuilder.computeIfAbsent(coreAttr, a -> {
89+
String tempName = TemporaryNameUtils.locallyUniqueTemporaryName(a.name());
90+
return new Alias(a.source(), tempName, a, null, true);
91+
});
92+
93+
Attribute renamedAttribute = renaming.toAttribute();
94+
Alias renamedBack;
95+
if (newProj instanceof Alias as) {
96+
renamedBack = new Alias(as.source(), as.name(), renamedAttribute, as.id(), as.synthetic());
97+
} else {
98+
// no alias - that means proj == coreAttr
99+
renamedBack = new Alias(coreAttr.source(), coreAttr.name(), renamedAttribute, coreAttr.id(), coreAttr.synthetic());
100+
}
101+
finalProjections.add(renamedBack);
102+
} else {
103+
finalProjections.add(newProj);
104+
}
105+
}
106+
107+
if (aliasesForReplacedAttributesBuilder.isEmpty()) {
108+
// No name conflicts, so no eval needed.
109+
return new Project(project.source(), updatedJoin.replaceLeft(project.child()), newProjections);
110+
}
111+
112+
List<Alias> renamesForEval = new ArrayList<>(aliasesForReplacedAttributesBuilder.build().values());
113+
Eval eval = new Eval(project.source(), project.child(), renamesForEval);
114+
Join finalJoin = new Join(join.source(), eval, updatedJoin.right(), updatedJoin.config());
115+
116+
return new Project(project.source(), finalJoin, finalProjections);
117+
}
118+
119+
return join;
120+
}
121+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownUtils.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.ArrayList;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
29-
import java.util.LinkedHashSet;
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.Set;
@@ -55,10 +54,10 @@ class PushDownUtils {
5554
public static <Plan extends UnaryPlan & GeneratingPlan<Plan>> LogicalPlan pushGeneratingPlanPastProjectAndOrderBy(Plan generatingPlan) {
5655
LogicalPlan child = generatingPlan.child();
5756
if (child instanceof OrderBy orderBy) {
58-
Set<String> evalFieldNames = new LinkedHashSet<>(Expressions.names(generatingPlan.generatedAttributes()));
57+
Set<String> generatedFieldNames = new HashSet<>(Expressions.names(generatingPlan.generatedAttributes()));
5958

6059
// Look for attributes in the OrderBy's expressions and create aliases with temporary names for them.
61-
AttributeReplacement nonShadowedOrders = renameAttributesInExpressions(evalFieldNames, orderBy.order());
60+
AttributeReplacement nonShadowedOrders = renameAttributesInExpressions(generatedFieldNames, orderBy.order());
6261

6362
AttributeMap<Alias> aliasesForShadowedOrderByAttrs = nonShadowedOrders.replacedAttributes;
6463
@SuppressWarnings("unchecked")
@@ -91,8 +90,7 @@ public static <Plan extends UnaryPlan & GeneratingPlan<Plan>> LogicalPlan pushGe
9190

9291
List<Attribute> generatedAttributes = generatingPlan.generatedAttributes();
9392

94-
@SuppressWarnings("unchecked")
95-
Plan generatingPlanWithResolvedExpressions = (Plan) resolveRenamesFromProject(generatingPlan, project);
93+
Plan generatingPlanWithResolvedExpressions = resolveRenamesFromProject(generatingPlan, project);
9694

9795
Set<String> namesReferencedInRenames = new HashSet<>();
9896
for (NamedExpression ne : project.projections()) {
@@ -156,7 +154,7 @@ private static AttributeReplacement renameAttributesInExpressions(
156154
rewrittenExpressions.add(expr.transformUp(Attribute.class, attr -> {
157155
if (attributeNamesToRename.contains(attr.name())) {
158156
Alias renamedAttribute = aliasesForReplacedAttributesBuilder.computeIfAbsent(attr, a -> {
159-
String tempName = TemporaryNameUtils.locallyUniqueTemporaryName(a.name(), "temp_name");
157+
String tempName = TemporaryNameUtils.locallyUniqueTemporaryName(a.name());
160158
return new Alias(a.source(), tempName, a, null, true);
161159
});
162160
return renamedAttribute.toAttribute();
@@ -181,7 +179,7 @@ private static Map<String, String> newNamesForConflictingAttributes(
181179
for (Attribute attr : potentiallyConflictingAttributes) {
182180
String name = attr.name();
183181
if (reservedNames.contains(name)) {
184-
renameAttributeTo.putIfAbsent(name, TemporaryNameUtils.locallyUniqueTemporaryName(name, "temp_name"));
182+
renameAttributeTo.putIfAbsent(name, TemporaryNameUtils.locallyUniqueTemporaryName(name));
185183
}
186184
}
187185

@@ -198,12 +196,17 @@ public static Project pushDownPastProject(UnaryPlan parent) {
198196
}
199197
}
200198

201-
private static UnaryPlan resolveRenamesFromProject(UnaryPlan plan, Project project) {
199+
private static <P extends LogicalPlan> P resolveRenamesFromProject(P plan, Project project) {
202200
AttributeMap.Builder<Expression> aliasBuilder = AttributeMap.builder();
203201
project.forEachExpression(Alias.class, a -> aliasBuilder.put(a.toAttribute(), a.child()));
204202
var aliases = aliasBuilder.build();
205203

206-
return (UnaryPlan) plan.transformExpressionsOnly(ReferenceAttribute.class, r -> aliases.resolve(r, r));
204+
return resolveRenamesFromMap(plan, aliases);
205+
}
206+
207+
@SuppressWarnings("unchecked")
208+
public static <P extends LogicalPlan> P resolveRenamesFromMap(P plan, AttributeMap<Expression> map) {
209+
return (P) plan.transformExpressionsOnly(ReferenceAttribute.class, r -> map.resolve(r, r));
207210
}
208211

209212
private record AttributeReplacement(List<Expression> rewrittenExpressions, AttributeMap<Alias> replacedAttributes) {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TemporaryNameUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public static String temporaryName(Expression inner, Expression outer, int suffi
2222
return Attribute.rawTemporaryName(in, out, String.valueOf(suffix));
2323
}
2424

25-
public static String locallyUniqueTemporaryName(String inner, String outer) {
26-
return Attribute.rawTemporaryName(inner, outer, (new NameId()).toString());
25+
public static String locallyUniqueTemporaryName(String inner) {
26+
return Attribute.rawTemporaryName(inner, "temp_name", (new NameId()).toString());
2727
}
2828

2929
static String toString(Expression ex) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
13+
import org.elasticsearch.xpack.esql.core.expression.Alias;
1314
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1415
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1516
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
17+
import org.elasticsearch.xpack.esql.core.expression.UnresolvedNamedExpression;
1618
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1719
import org.elasticsearch.xpack.esql.core.tree.Source;
1820
import org.elasticsearch.xpack.esql.expression.function.Functions;
@@ -23,7 +25,7 @@
2325
import java.util.Objects;
2426

2527
/**
26-
* A {@code Project} is a {@code Plan} with one child. In {@code SELECT x FROM y}, the "SELECT" statement is a Project.
28+
* A {@code Project} is a {@code Plan} with one child. In {@code FROM idx | KEEP x, y}, the {@code KEEP} statement is a Project.
2729
*/
2830
public class Project extends UnaryPlan implements SortAgnostic {
2931
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Project", Project::new);
@@ -33,6 +35,20 @@ public class Project extends UnaryPlan implements SortAgnostic {
3335
public Project(Source source, LogicalPlan child, List<? extends NamedExpression> projections) {
3436
super(source, child);
3537
this.projections = projections;
38+
assert validateProjections(projections);
39+
}
40+
41+
private boolean validateProjections(List<? extends NamedExpression> projections) {
42+
for (NamedExpression ne : projections) {
43+
if (ne instanceof Alias as) {
44+
if (as.child() instanceof Attribute == false) {
45+
return false;
46+
}
47+
} else if (ne instanceof Attribute == false && ne instanceof UnresolvedNamedExpression == false) {
48+
return false;
49+
}
50+
}
51+
return true;
3652
}
3753

3854
private Project(StreamInput in) throws IOException {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/EsqlProject.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
import java.io.IOException;
2222
import java.util.List;
2323

24+
/**
25+
* A projection when first parsed, i.e. obtained from {@code KEEP, DROP, RENAME}. After the analysis step, we use {@link Project}.
26+
*/
27+
// TODO: Consolidate with Project. We don't need the pre-/post-analysis distinction for other logical plans.
28+
// https://github.com/elastic/elasticsearch/issues/109195
2429
public class EsqlProject extends Project {
2530
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
2631
LogicalPlan.class,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,12 @@ public static IndexResolution expandedDefaultIndexResolution() {
161161
}
162162

163163
public static Map<String, IndexResolution> defaultLookupResolution() {
164-
return Map.of("languages_lookup", loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP));
164+
return Map.of(
165+
"languages_lookup",
166+
loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP),
167+
"test_lookup",
168+
loadMapping("mapping-basic.json", "test_lookup", IndexMode.LOOKUP)
169+
);
165170
}
166171

167172
public static EnrichResolution defaultEnrichResolution() {

0 commit comments

Comments
 (0)