Skip to content

Commit 64107e0

Browse files
authored
Compute output of LookupJoinExec dynamically (#117763)
LookupJoinExec should not assume its output but instead compute it from - Its input fields from the left - The fields added from the lookup index Currently, LookupJoinExec's output is determined when the logical plan is mapped to a physical one, and thereafter the output cannot be changed anymore. This makes it impossible to have late materialization of fields from the left hand side via field extractions, because we are forced to extract *all* fields before the LookupJoinExec, otherwise we do not achieve the prescribed output. Avoid that by tracking only which fields the LookupJoinExec will add from the lookup index instead of tracking the whole output (that was only correct for the logical plan). **Note:** While this PR is a refactoring for the current functionality, it should unblock @craigtaverner 's ongoing work related to field extractions and getting multiple LOOKUP JOIN queries to work correctly without adding hacks.
1 parent 5f045c0 commit 64107e0

File tree

11 files changed

+60
-116
lines changed

11 files changed

+60
-116
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
4848
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
4949
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
50-
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V2;
50+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V3;
5151
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
5252
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
5353
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
@@ -125,7 +125,7 @@ protected void shouldSkipTest(String testName) throws IOException {
125125
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
126126
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
127127
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
128-
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V2.capabilityName()));
128+
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V3.capabilityName()));
129129
}
130130

131131
private TestFeatureService remoteFeaturesService() throws IOException {

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
77
basicOnTheDataNode-Ignore
8-
required_capability: join_lookup_v2
8+
required_capability: join_lookup_v3
99

1010
FROM employees
1111
| EVAL language_code = languages
@@ -22,7 +22,7 @@ emp_no:integer | language_code:integer | language_name:keyword
2222
;
2323

2424
basicRow-Ignore
25-
required_capability: join_lookup
25+
required_capability: join_lookup_v3
2626

2727
ROW language_code = 1
2828
| LOOKUP JOIN languages_lookup ON language_code
@@ -33,7 +33,7 @@ language_code:keyword | language_name:keyword
3333
;
3434

3535
basicOnTheCoordinator
36-
required_capability: join_lookup_v2
36+
required_capability: join_lookup_v3
3737

3838
FROM employees
3939
| SORT emp_no
@@ -51,7 +51,7 @@ emp_no:integer | language_code:integer | language_name:keyword
5151

5252
//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
5353
subsequentEvalOnTheDataNode-Ignore
54-
required_capability: join_lookup_v2
54+
required_capability: join_lookup_v3
5555

5656
FROM employees
5757
| EVAL language_code = languages
@@ -69,7 +69,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
6969
;
7070

7171
subsequentEvalOnTheCoordinator
72-
required_capability: join_lookup_v2
72+
required_capability: join_lookup_v3
7373

7474
FROM employees
7575
| SORT emp_no

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ public enum Cap {
521521
/**
522522
* LOOKUP JOIN
523523
*/
524-
JOIN_LOOKUP_V2(Build.current().isSnapshot()),
524+
JOIN_LOOKUP_V3(Build.current().isSnapshot()),
525525

526526
/**
527527
* Fix for https://github.com/elastic/elasticsearch/issues/117054

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalOptimizerRules.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10-
import org.elasticsearch.xpack.esql.core.expression.Expression;
11-
import org.elasticsearch.xpack.esql.core.util.ReflectionUtils;
1210
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection;
1311
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1412
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
@@ -62,34 +60,4 @@ public final PhysicalPlan apply(PhysicalPlan plan) {
6260

6361
protected abstract PhysicalPlan rule(SubPlan plan);
6462
}
65-
66-
public abstract static class OptimizerExpressionRule<E extends Expression> extends Rule<PhysicalPlan, PhysicalPlan> {
67-
68-
private final TransformDirection direction;
69-
// overriding type token which returns the correct class but does an uncheck cast to LogicalPlan due to its generic bound
70-
// a proper solution is to wrap the Expression rule into a Plan rule but that would affect the rule declaration
71-
// so instead this is hacked here
72-
private final Class<E> expressionTypeToken = ReflectionUtils.detectSuperTypeForRuleLike(getClass());
73-
74-
public OptimizerExpressionRule(TransformDirection direction) {
75-
this.direction = direction;
76-
}
77-
78-
@Override
79-
public final PhysicalPlan apply(PhysicalPlan plan) {
80-
return direction == TransformDirection.DOWN
81-
? plan.transformExpressionsDown(expressionTypeToken, this::rule)
82-
: plan.transformExpressionsUp(expressionTypeToken, this::rule);
83-
}
84-
85-
protected PhysicalPlan rule(PhysicalPlan plan) {
86-
return plan;
87-
}
88-
89-
protected abstract Expression rule(E e);
90-
91-
public Class<E> expressionToken() {
92-
return expressionTypeToken;
93-
}
94-
}
9563
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,15 @@ private static Set<Attribute> missingAttributes(PhysicalPlan p) {
104104
var missing = new LinkedHashSet<Attribute>();
105105
var inputSet = p.inputSet();
106106

107-
// FIXME: the extractors should work on the right side as well
107+
// TODO: We need to extract whatever fields are missing from the left hand side.
108108
// skip the lookup join since the right side is always materialized and a projection
109109
if (p instanceof LookupJoinExec join) {
110-
// collect fields used in the join condition
111110
return Collections.emptySet();
112111
}
113112

114113
var input = inputSet;
115114
// collect field attributes used inside expressions
115+
// TODO: Rather than going over all expressions manually, this should just call .references()
116116
p.forEachExpression(TypedAttribute.class, f -> {
117117
if (f instanceof FieldAttribute || f instanceof MetadataAttribute) {
118118
if (input.contains(f) == false) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
14-
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
14+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1515
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1616
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1717
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -23,12 +23,9 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Objects;
26-
import java.util.Set;
27-
import java.util.stream.Collectors;
2826

2927
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3028
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
31-
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.RIGHT;
3229

3330
public class Join extends BinaryPlan {
3431
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new);
@@ -100,6 +97,19 @@ public List<Attribute> output() {
10097
return lazyOutput;
10198
}
10299

100+
public List<Attribute> rightOutputFields() {
101+
AttributeSet leftInputs = left().outputSet();
102+
103+
List<Attribute> rightOutputFields = new ArrayList<>();
104+
for (Attribute attr : output()) {
105+
if (leftInputs.contains(attr) == false) {
106+
rightOutputFields.add(attr);
107+
}
108+
}
109+
110+
return rightOutputFields;
111+
}
112+
103113
/**
104114
* Combine the two lists of attributes into one.
105115
* In case of (name) conflicts, specify which sides wins, that is overrides the other column - the left or the right.
@@ -108,18 +118,11 @@ public static List<Attribute> computeOutput(List<Attribute> leftOutput, List<Att
108118
JoinType joinType = config.type();
109119
List<Attribute> output;
110120
// TODO: make the other side nullable
111-
Set<String> matchFieldNames = config.matchFields().stream().map(NamedExpression::name).collect(Collectors.toSet());
112121
if (LEFT.equals(joinType)) {
113-
// right side becomes nullable and overrides left except for match fields, which we preserve from the left
114-
List<Attribute> rightOutputWithoutMatchFields = rightOutput.stream()
115-
.filter(attr -> matchFieldNames.contains(attr.name()) == false)
116-
.toList();
122+
// right side becomes nullable and overrides left except for join keys, which we preserve from the left
123+
AttributeSet rightKeys = new AttributeSet(config.rightFields());
124+
List<Attribute> rightOutputWithoutMatchFields = rightOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList();
117125
output = mergeOutputAttributes(rightOutputWithoutMatchFields, leftOutput);
118-
} else if (RIGHT.equals(joinType)) {
119-
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream()
120-
.filter(attr -> matchFieldNames.contains(attr.name()) == false)
121-
.toList();
122-
output = mergeOutputAttributes(leftOutputWithoutMatchFields, rightOutput);
123126
} else {
124127
throw new IllegalArgumentException(joinType.joinName() + " unsupported");
125128
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22-
import java.util.Comparator;
2322
import java.util.List;
2423
import java.util.Objects;
2524

@@ -30,43 +29,43 @@ public class LookupJoinExec extends BinaryExec implements EstimatesRowSize {
3029
LookupJoinExec::new
3130
);
3231

33-
private final List<Attribute> matchFields;
3432
private final List<Attribute> leftFields;
3533
private final List<Attribute> rightFields;
36-
private final List<Attribute> output;
37-
private List<Attribute> lazyAddedFields;
34+
/**
35+
* These cannot be computed from the left + right outputs, because
36+
* {@link org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes} will replace the {@link EsSourceExec} on
37+
* the right hand side by a {@link EsQueryExec}, and thus lose the information of which fields we'll get from the lookup index.
38+
*/
39+
private final List<Attribute> addedFields;
40+
private List<Attribute> lazyOutput;
3841

3942
public LookupJoinExec(
4043
Source source,
4144
PhysicalPlan left,
4245
PhysicalPlan lookup,
43-
List<Attribute> matchFields,
4446
List<Attribute> leftFields,
4547
List<Attribute> rightFields,
46-
List<Attribute> output
48+
List<Attribute> addedFields
4749
) {
4850
super(source, left, lookup);
49-
this.matchFields = matchFields;
5051
this.leftFields = leftFields;
5152
this.rightFields = rightFields;
52-
this.output = output;
53+
this.addedFields = addedFields;
5354
}
5455

5556
private LookupJoinExec(StreamInput in) throws IOException {
5657
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class), in.readNamedWriteable(PhysicalPlan.class));
57-
this.matchFields = in.readNamedWriteableCollectionAsList(Attribute.class);
5858
this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class);
5959
this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class);
60-
this.output = in.readNamedWriteableCollectionAsList(Attribute.class);
60+
this.addedFields = in.readNamedWriteableCollectionAsList(Attribute.class);
6161
}
6262

6363
@Override
6464
public void writeTo(StreamOutput out) throws IOException {
6565
super.writeTo(out);
66-
out.writeNamedWriteableCollection(matchFields);
6766
out.writeNamedWriteableCollection(leftFields);
6867
out.writeNamedWriteableCollection(rightFields);
69-
out.writeNamedWriteableCollection(output);
68+
out.writeNamedWriteableCollection(addedFields);
7069
}
7170

7271
@Override
@@ -78,10 +77,6 @@ public PhysicalPlan lookup() {
7877
return right();
7978
}
8079

81-
public List<Attribute> matchFields() {
82-
return matchFields;
83-
}
84-
8580
public List<Attribute> leftFields() {
8681
return leftFields;
8782
}
@@ -91,29 +86,26 @@ public List<Attribute> rightFields() {
9186
}
9287

9388
public List<Attribute> addedFields() {
94-
if (lazyAddedFields == null) {
95-
AttributeSet set = outputSet();
96-
set.removeAll(left().output());
97-
for (Attribute m : matchFields) {
98-
set.removeIf(a -> a.name().equals(m.name()));
89+
return addedFields;
90+
}
91+
92+
@Override
93+
public List<Attribute> output() {
94+
if (lazyOutput == null) {
95+
lazyOutput = new ArrayList<>(left().output());
96+
for (Attribute attr : addedFields) {
97+
lazyOutput.add(attr);
9998
}
100-
lazyAddedFields = new ArrayList<>(set);
101-
lazyAddedFields.sort(Comparator.comparing(Attribute::name));
10299
}
103-
return lazyAddedFields;
100+
return lazyOutput;
104101
}
105102

106103
@Override
107104
public PhysicalPlan estimateRowSize(State state) {
108-
state.add(false, output);
105+
state.add(false, output());
109106
return this;
110107
}
111108

112-
@Override
113-
public List<Attribute> output() {
114-
return output;
115-
}
116-
117109
@Override
118110
public AttributeSet inputSet() {
119111
// TODO: this is a hack since the right side is always materialized - instead this should
@@ -129,12 +121,12 @@ protected AttributeSet computeReferences() {
129121

130122
@Override
131123
public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
132-
return new LookupJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);
124+
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields);
133125
}
134126

135127
@Override
136128
protected NodeInfo<? extends PhysicalPlan> info() {
137-
return NodeInfo.create(this, LookupJoinExec::new, left(), right(), matchFields, leftFields, rightFields, output);
129+
return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields);
138130
}
139131

140132
@Override
@@ -148,15 +140,12 @@ public boolean equals(Object o) {
148140
if (super.equals(o) == false) {
149141
return false;
150142
}
151-
LookupJoinExec hash = (LookupJoinExec) o;
152-
return matchFields.equals(hash.matchFields)
153-
&& leftFields.equals(hash.leftFields)
154-
&& rightFields.equals(hash.rightFields)
155-
&& output.equals(hash.output);
143+
LookupJoinExec other = (LookupJoinExec) o;
144+
return leftFields.equals(other.leftFields) && rightFields.equals(other.rightFields) && addedFields.equals(other.addedFields);
156145
}
157146

158147
@Override
159148
public int hashCode() {
160-
return Objects.hash(super.hashCode(), matchFields, leftFields, rightFields, output);
149+
return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields);
161150
}
162151
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,8 +583,8 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
583583
if (localSourceExec.indexMode() != IndexMode.LOOKUP) {
584584
throw new IllegalArgumentException("can't plan [" + join + "]");
585585
}
586-
List<Layout.ChannelAndType> matchFields = new ArrayList<>(join.matchFields().size());
587-
for (Attribute m : join.matchFields()) {
586+
List<Layout.ChannelAndType> matchFields = new ArrayList<>(join.leftFields().size());
587+
for (Attribute m : join.leftFields()) {
588588
Layout.ChannelAndType t = source.layout.get(m.id());
589589
if (t == null) {
590590
throw new IllegalArgumentException("can't plan [" + join + "][" + m + "]");
@@ -604,7 +604,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
604604
lookupFromIndexService,
605605
matchFields.getFirst().type(),
606606
localSourceExec.index().name(),
607-
join.matchFields().getFirst().name(),
607+
join.leftFields().getFirst().name(),
608608
join.addedFields().stream().map(f -> (NamedExpression) f).toList(),
609609
join.source()
610610
),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,7 @@ private PhysicalPlan mapBinary(BinaryPlan binary) {
120120
);
121121
}
122122
if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) {
123-
return new LookupJoinExec(
124-
join.source(),
125-
left,
126-
right,
127-
config.matchFields(),
128-
config.leftFields(),
129-
config.rightFields(),
130-
join.output()
131-
);
123+
return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields());
132124
}
133125
}
134126

0 commit comments

Comments
 (0)