Skip to content

Commit dc7ea9e

Browse files
authored
ESQL: Fix LookupJoin output (#117639)
* Fix output methods related to LookupJoin * Add tests with subsequent EVAL * Fix BinaryPlan.computeReferences This must not just use the references from its own output. Not only is this wrong, it also leads to failures when we call the .references() method on unresolved plans.
1 parent 79d7068 commit dc7ea9e

File tree

11 files changed

+91
-111
lines changed

11 files changed

+91
-111
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
4848
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
4949
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
50-
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP;
50+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V2;
5151
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
5252
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
5353
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
@@ -125,7 +125,7 @@ protected void shouldSkipTest(String testName) throws IOException {
125125
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
126126
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
127127
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
128-
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP.capabilityName()));
128+
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V2.capabilityName()));
129129
}
130130

131131
private TestFeatureService remoteFeaturesService() throws IOException {

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@
33
// Reuses the sample dataset and commands from enrich.csv-spec
44
//
55

6-
basicOnTheDataNode
7-
required_capability: join_lookup
6+
//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
7+
basicOnTheDataNode-Ignore
8+
required_capability: join_lookup_v2
89

9-
//TODO: this returns different results in CI then locally
10-
// sometimes null, sometimes spanish (likely related to the execution order)
1110
FROM employees
1211
| EVAL language_code = languages
1312
| LOOKUP JOIN languages_lookup ON language_code
14-
| WHERE emp_no < 500
15-
| KEEP emp_no, language_name
13+
| WHERE emp_no >= 10091 AND emp_no < 10094
1614
| SORT emp_no
17-
| LIMIT 1
15+
| KEEP emp_no, language_code, language_name
1816
;
1917

20-
emp_no:integer | language_name:keyword
21-
//10091 | Spanish
18+
emp_no:integer | language_code:integer | language_name:keyword
19+
10091 | 3 | Spanish
20+
10092 | 1 | English
21+
10093 | 3 | Spanish
2222
;
2323

2424
basicRow-Ignore
@@ -33,16 +33,55 @@ language_code:keyword | language_name:keyword
3333
;
3434

3535
basicOnTheCoordinator
36-
required_capability: join_lookup
36+
required_capability: join_lookup_v2
37+
38+
FROM employees
39+
| SORT emp_no
40+
| LIMIT 3
41+
| EVAL language_code = languages
42+
| LOOKUP JOIN languages_lookup ON language_code
43+
| KEEP emp_no, language_code, language_name
44+
;
45+
46+
emp_no:integer | language_code:integer | language_name:keyword
47+
10001 | 2 | French
48+
10002 | 5 | null
49+
10003 | 4 | German
50+
;
51+
52+
//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
53+
subsequentEvalOnTheDataNode-Ignore
54+
required_capability: join_lookup_v2
55+
56+
FROM employees
57+
| EVAL language_code = languages
58+
| LOOKUP JOIN languages_lookup ON language_code
59+
| WHERE emp_no >= 10091 AND emp_no < 10094
60+
| SORT emp_no
61+
| KEEP emp_no, language_code, language_name
62+
| EVAL language_name = TO_LOWER(language_name), language_code_x2 = 2*language_code
63+
;
64+
65+
emp_no:integer | language_code:integer | language_name:keyword | language_code_x2:integer
66+
10091 | 3 | spanish | 6
67+
10092 | 1 | english | 2
68+
10093 | 3 | spanish | 6
69+
;
70+
71+
subsequentEvalOnTheCoordinator
72+
required_capability: join_lookup_v2
3773

3874
FROM employees
3975
| SORT emp_no
40-
| LIMIT 1
76+
| LIMIT 3
4177
| EVAL language_code = languages
4278
| LOOKUP JOIN languages_lookup ON language_code
43-
| KEEP emp_no, language_name
79+
| KEEP emp_no, language_code, language_name
80+
| EVAL language_name = TO_LOWER(language_name), language_code_x2 = 2*language_code
4481
;
4582

46-
emp_no:integer | language_name:keyword
47-
10001 | French
83+
emp_no:integer | language_code:integer | language_name:keyword | language_code_x2:integer
84+
10001 | 2 | french | 4
85+
10002 | 5 | null | 10
86+
10003 | 4 | german | 8
4887
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ public enum Cap {
524524
/**
525525
* LOOKUP JOIN
526526
*/
527-
JOIN_LOOKUP(Build.current().isSnapshot()),
527+
JOIN_LOOKUP_V2(Build.current().isSnapshot()),
528528

529529
/**
530530
* Fix for https://github.com/elastic/elasticsearch/issues/117054

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
2222
import org.elasticsearch.xpack.esql.core.expression.Alias;
2323
import org.elasticsearch.xpack.esql.core.expression.Attribute;
24-
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
2524
import org.elasticsearch.xpack.esql.core.expression.EmptyAttribute;
2625
import org.elasticsearch.xpack.esql.core.expression.Expression;
2726
import org.elasticsearch.xpack.esql.core.expression.Expressions;
@@ -609,8 +608,7 @@ private Join resolveLookupJoin(LookupJoin join) {
609608
JoinConfig config = join.config();
610609
// for now, support only (LEFT) USING clauses
611610
JoinType type = config.type();
612-
// rewrite the join into a equi-join between the field with the same name between left and right
613-
// per SQL standard, the USING columns are placed first in the output, followed by the rest of left, then right
611+
// rewrite the join into an equi-join between the field with the same name between left and right
614612
if (type instanceof UsingJoinType using) {
615613
List<Attribute> cols = using.columns();
616614
// the lookup cannot be resolved, bail out
@@ -632,14 +630,9 @@ private Join resolveLookupJoin(LookupJoin join) {
632630
// resolve the using columns against the left and the right side then assemble the new join config
633631
List<Attribute> leftKeys = resolveUsingColumns(cols, join.left().output(), "left");
634632
List<Attribute> rightKeys = resolveUsingColumns(cols, join.right().output(), "right");
635-
List<Attribute> output = new ArrayList<>(join.left().output());
636-
// the order is stable (since the AttributeSet preservers the insertion order)
637-
output.addAll(join.right().outputSet().subtract(new AttributeSet(rightKeys)));
638-
639-
// update the config - pick the left keys as those in the output
640-
type = new UsingJoinType(coreJoin, rightKeys);
641-
config = new JoinConfig(type, leftKeys, leftKeys, rightKeys);
642-
join = new LookupJoin(join.source(), join.left(), join.right(), config, output);
633+
634+
config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys);
635+
join = new LookupJoin(join.source(), join.left(), join.right(), config);
643636
}
644637
// everything else is unsupported for now
645638
else {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public QueryPlan(Source source, List<PlanType> children) {
3333
super(source, children);
3434
}
3535

36+
/**
37+
* The ordered list of attributes (i.e. columns) this plan produces when executed.
38+
* Must be called only on resolved plans, otherwise may throw an exception or return wrong results.
39+
*/
3640
public abstract List<Attribute> output();
3741

3842
public AttributeSet outputSet() {
@@ -87,6 +91,7 @@ public AttributeSet references() {
8791

8892
/**
8993
* This very likely needs to be overridden for {@link QueryPlan#references} to be correct when inheriting.
94+
* This can be called on unresolved plans and therefore must not rely on calls to {@link QueryPlan#output()}.
9095
*/
9196
protected AttributeSet computeReferences() {
9297
return Expressions.references(expressions());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/BinaryPlan.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
*/
77
package org.elasticsearch.xpack.esql.plan.logical;
88

9-
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
10-
import org.elasticsearch.xpack.esql.core.expression.Expressions;
119
import org.elasticsearch.xpack.esql.core.tree.Source;
1210

1311
import java.util.Arrays;
@@ -45,11 +43,6 @@ public final BinaryPlan replaceRight(LogicalPlan newRight) {
4543
return replaceChildren(left, newRight);
4644
}
4745

48-
protected AttributeSet computeReferences() {
49-
// TODO: this needs to be driven by the join config
50-
return Expressions.references(output());
51-
}
52-
5346
public abstract BinaryPlan replaceChildren(LogicalPlan left, LogicalPlan right);
5447

5548
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13-
import org.elasticsearch.common.util.Maps;
1413
import org.elasticsearch.xpack.esql.core.expression.Attribute;
15-
import org.elasticsearch.xpack.esql.core.expression.Nullability;
14+
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1615
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1716
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1817
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -23,9 +22,11 @@
2322
import java.io.IOException;
2423
import java.util.ArrayList;
2524
import java.util.List;
26-
import java.util.Map;
2725
import java.util.Objects;
26+
import java.util.Set;
27+
import java.util.stream.Collectors;
2828

29+
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
2930
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
3031
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.RIGHT;
3132

@@ -107,37 +108,24 @@ public static List<Attribute> computeOutput(List<Attribute> leftOutput, List<Att
107108
JoinType joinType = config.type();
108109
List<Attribute> output;
109110
// TODO: make the other side nullable
111+
Set<String> matchFieldNames = config.matchFields().stream().map(NamedExpression::name).collect(Collectors.toSet());
110112
if (LEFT.equals(joinType)) {
111-
// right side becomes nullable and overrides left
112-
// output = merge(leftOutput, makeNullable(rightOutput));
113-
output = merge(leftOutput, rightOutput);
113+
// right side becomes nullable and overrides left except for match fields, which we preserve from the left
114+
List<Attribute> rightOutputWithoutMatchFields = rightOutput.stream()
115+
.filter(attr -> matchFieldNames.contains(attr.name()) == false)
116+
.toList();
117+
output = mergeOutputAttributes(rightOutputWithoutMatchFields, leftOutput);
114118
} else if (RIGHT.equals(joinType)) {
115-
// left side becomes nullable and overrides right
116-
// output = merge(makeNullable(leftOutput), rightOutput);
117-
output = merge(leftOutput, rightOutput);
119+
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream()
120+
.filter(attr -> matchFieldNames.contains(attr.name()) == false)
121+
.toList();
122+
output = mergeOutputAttributes(leftOutputWithoutMatchFields, rightOutput);
118123
} else {
119124
throw new IllegalArgumentException(joinType.joinName() + " unsupported");
120125
}
121126
return output;
122127
}
123128

124-
/**
125-
* Merge the two lists of attributes into one and preserves order.
126-
*/
127-
private static List<Attribute> merge(List<Attribute> left, List<Attribute> right) {
128-
// use linked hash map to preserve order
129-
Map<String, Attribute> nameToAttribute = Maps.newLinkedHashMapWithExpectedSize(left.size() + right.size());
130-
for (Attribute a : left) {
131-
nameToAttribute.put(a.name(), a);
132-
}
133-
for (Attribute a : right) {
134-
// override the existing entry in place
135-
nameToAttribute.compute(a.name(), (name, existing) -> a);
136-
}
137-
138-
return new ArrayList<>(nameToAttribute.values());
139-
}
140-
141129
/**
142130
* Make fields references, so we don't check if they exist in the index.
143131
* We do this for fields that we know don't come from the index.
@@ -161,14 +149,6 @@ public static List<Attribute> makeReference(List<Attribute> output) {
161149
return out;
162150
}
163151

164-
private static List<Attribute> makeNullable(List<Attribute> output) {
165-
List<Attribute> out = new ArrayList<>(output.size());
166-
for (Attribute a : output) {
167-
out.add(a.withNullability(Nullability.TRUE));
168-
}
169-
return out;
170-
}
171-
172152
@Override
173153
public boolean expressionsResolved() {
174154
return config.expressionsResolved();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType;
1717

1818
import java.util.List;
19-
import java.util.Objects;
2019

2120
import static java.util.Collections.emptyList;
2221
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
@@ -26,10 +25,8 @@
2625
*/
2726
public class LookupJoin extends Join implements SurrogateLogicalPlan {
2827

29-
private final List<Attribute> output;
30-
3128
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List<Attribute> joinFields) {
32-
this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), emptyList());
29+
this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList());
3330
}
3431

3532
public LookupJoin(
@@ -39,36 +36,28 @@ public LookupJoin(
3936
JoinType type,
4037
List<Attribute> joinFields,
4138
List<Attribute> leftFields,
42-
List<Attribute> rightFields,
43-
List<Attribute> output
39+
List<Attribute> rightFields
4440
) {
45-
this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields), output);
41+
this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields));
4642
}
4743

48-
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig, List<Attribute> output) {
44+
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig) {
4945
super(source, left, right, joinConfig);
50-
this.output = output;
5146
}
5247

5348
/**
5449
* Translate the expression into a regular join with a Projection on top, to deal with serialization &amp; co.
5550
*/
5651
@Override
5752
public LogicalPlan surrogate() {
58-
JoinConfig cfg = config();
59-
JoinConfig newConfig = new JoinConfig(LEFT, cfg.matchFields(), cfg.leftFields(), cfg.rightFields());
60-
Join normalized = new Join(source(), left(), right(), newConfig);
53+
Join normalized = new Join(source(), left(), right(), config());
6154
// TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now
62-
return new Project(source(), normalized, output);
63-
}
64-
65-
public List<Attribute> output() {
66-
return output;
55+
return new Project(source(), normalized, output());
6756
}
6857

6958
@Override
7059
public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
71-
return new LookupJoin(source(), left, right, config(), output);
60+
return new LookupJoin(source(), left, right, config());
7261
}
7362

7463
@Override
@@ -81,23 +70,7 @@ protected NodeInfo<Join> info() {
8170
config().type(),
8271
config().matchFields(),
8372
config().leftFields(),
84-
config().rightFields(),
85-
output
73+
config().rightFields()
8674
);
8775
}
88-
89-
@Override
90-
public int hashCode() {
91-
return Objects.hash(super.hashCode(), output);
92-
}
93-
94-
@Override
95-
public boolean equals(Object obj) {
96-
if (super.equals(obj) == false) {
97-
return false;
98-
}
99-
100-
LookupJoin other = (LookupJoin) obj;
101-
return Objects.equals(output, other.output);
102-
}
10376
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import java.util.List;
8080
import java.util.Map;
8181
import java.util.Set;
82-
import java.util.function.Predicate;
8382
import java.util.stream.Collectors;
8483

8584
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -466,8 +465,6 @@ static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchF
466465
// ie "from test | eval lang = languages + 1 | keep *l" should consider both "languages" and "*l" as valid fields to ask for
467466
AttributeSet keepCommandReferences = new AttributeSet();
468467
AttributeSet keepJoinReferences = new AttributeSet();
469-
List<Predicate<String>> keepMatches = new ArrayList<>();
470-
List<String> keepPatterns = new ArrayList<>();
471468

472469
parsed.forEachDown(p -> {// go over each plan top-down
473470
if (p instanceof RegexExtract re) { // for Grok and Dissect
@@ -501,7 +498,6 @@ static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchF
501498
references.add(ua);
502499
if (p instanceof Keep) {
503500
keepCommandReferences.add(ua);
504-
keepMatches.add(up::match);
505501
}
506502
});
507503
if (p instanceof Keep) {

0 commit comments

Comments
 (0)