Skip to content

Commit e4defca

Browse files
authored
ESQL: Small LOOKUP JOIN cleanups (#117922)
* Simplify InsertFieldExtraction * Fix wrong error message when looping ResolveRefs * Add basic verification tests * Add check for data type mismatch
1 parent 14d9e7c commit e4defca

File tree

10 files changed

+173
-51
lines changed

10 files changed

+173
-51
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,19 @@ left:keyword | client_ip:keyword | right:keyword | env:keyword
120120
left | 172.21.0.5 | right | Development
121121
;
122122

123+
lookupIPFromRowWithShadowingKeepReordered
124+
required_capability: join_lookup_v4
125+
126+
ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
127+
| EVAL client_ip = client_ip::keyword
128+
| LOOKUP JOIN clientips_lookup ON client_ip
129+
| KEEP right, env, client_ip
130+
;
131+
132+
right:keyword | env:keyword | client_ip:keyword
133+
right | Development | 172.21.0.5
134+
;
135+
123136
lookupIPFromIndex
124137
required_capability: join_lookup_v4
125138

@@ -263,6 +276,24 @@ ignoreOrder:true;
263276
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success
264277
;
265278

279+
lookupMessageFromIndexKeepReordered
280+
required_capability: join_lookup_v4
281+
282+
FROM sample_data
283+
| LOOKUP JOIN message_types_lookup ON message
284+
| KEEP type, client_ip, event_duration, message
285+
;
286+
287+
type:keyword | client_ip:ip | event_duration:long | message:keyword
288+
Success | 172.21.3.15 | 1756467 | Connected to 10.1.0.1
289+
Error | 172.21.3.15 | 5033755 | Connection error
290+
Error | 172.21.3.15 | 8268153 | Connection error
291+
Error | 172.21.3.15 | 725448 | Connection error
292+
Disconnected | 172.21.0.5 | 1232382 | Disconnected
293+
Success | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
294+
Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
295+
;
296+
266297
lookupMessageFromIndexStats
267298
required_capability: join_lookup_v4
268299

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,10 @@ private Join resolveLookupJoin(LookupJoin join) {
633633

634634
config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys);
635635
join = new LookupJoin(join.source(), join.left(), join.right(), config);
636-
}
637-
// everything else is unsupported for now
638-
else {
636+
} else if (type != JoinTypes.LEFT) {
637+
// everything else is unsupported for now
638+
// LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run
639+
// more than once.
639640
UnresolvedAttribute errorAttribute = new UnresolvedAttribute(join.source(), "unsupported", "Unsupported join type");
640641
// add error message
641642
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList()));
@@ -651,7 +652,7 @@ private List<Attribute> resolveUsingColumns(List<Attribute> cols, List<Attribute
651652
if (resolvedCol instanceof UnresolvedAttribute ucol) {
652653
String message = ua.unresolvedMessage();
653654
String match = "column [" + ucol.name() + "]";
654-
resolvedCol = ucol.withUnresolvedMessage(message.replace(match, match + "in " + side + " side of join"));
655+
resolvedCol = ucol.withUnresolvedMessage(message.replace(match, match + " in " + side + " side of join"));
655656
}
656657
resolved.add(resolvedCol);
657658
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/AnalyzerContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ public AnalyzerContext(
2626
IndexResolution indexResolution,
2727
EnrichResolution enrichResolution
2828
) {
29-
this(configuration, functionRegistry, indexResolution, IndexResolution.invalid("<none>"), enrichResolution);
29+
this(
30+
configuration,
31+
functionRegistry,
32+
indexResolution,
33+
IndexResolution.invalid("AnalyzerContext constructed without any lookup join resolution"),
34+
enrichResolution
35+
);
3036
}
3137
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.analysis;
99

10-
import org.elasticsearch.index.IndexMode;
1110
import org.elasticsearch.license.XPackLicenseState;
1211
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
1312
import org.elasticsearch.xpack.esql.common.Failure;
@@ -55,7 +54,8 @@
5554
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
5655
import org.elasticsearch.xpack.esql.plan.logical.Row;
5756
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
58-
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
57+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
58+
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
5959
import org.elasticsearch.xpack.esql.stats.FeatureMetric;
6060
import org.elasticsearch.xpack.esql.stats.Metrics;
6161

@@ -172,20 +172,6 @@ else if (p instanceof Lookup lookup) {
172172
else {
173173
lookup.matchFields().forEach(unresolvedExpressions);
174174
}
175-
} else if (p instanceof LookupJoin lj) {
176-
// expect right side to always be a lookup index
177-
lj.right().forEachUp(EsRelation.class, r -> {
178-
if (r.indexMode() != IndexMode.LOOKUP) {
179-
failures.add(
180-
fail(
181-
r,
182-
"LOOKUP JOIN right side [{}] must be a lookup index (index_mode=lookup, not [{}]",
183-
r.index().name(),
184-
r.indexMode().getName()
185-
)
186-
);
187-
}
188-
});
189175
}
190176

191177
else {
@@ -217,6 +203,7 @@ else if (p instanceof Lookup lookup) {
217203
checkSort(p, failures);
218204

219205
checkFullTextQueryFunctions(p, failures);
206+
checkJoin(p, failures);
220207
});
221208
checkRemoteEnrich(plan, failures);
222209
checkMetadataScoreNameReserved(plan, failures);
@@ -791,6 +778,35 @@ private static void checkNotPresentInDisjunctions(
791778
});
792779
}
793780

781+
/**
782+
* Checks Joins for invalid usage.
783+
*
784+
* @param plan root plan to check
785+
* @param failures failures found
786+
*/
787+
private static void checkJoin(LogicalPlan plan, Set<Failure> failures) {
788+
if (plan instanceof Join join) {
789+
JoinConfig config = join.config();
790+
for (int i = 0; i < config.leftFields().size(); i++) {
791+
Attribute leftField = config.leftFields().get(i);
792+
Attribute rightField = config.rightFields().get(i);
793+
if (leftField.dataType() != rightField.dataType()) {
794+
failures.add(
795+
fail(
796+
leftField,
797+
"JOIN left field [{}] of type [{}] is incompatible with right field [{}] of type [{}]",
798+
leftField.name(),
799+
leftField.dataType(),
800+
rightField.name(),
801+
rightField.dataType()
802+
)
803+
);
804+
}
805+
}
806+
807+
}
808+
}
809+
794810
/**
795811
* Checks full text query functions for invalid usage.
796812
*

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@
1111
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1212
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1313
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
14-
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
1514
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
1615
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
1716
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
1817
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
1918
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
2019
import org.elasticsearch.xpack.esql.plan.physical.LeafExec;
21-
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
2220
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2321
import org.elasticsearch.xpack.esql.rule.Rule;
2422

@@ -102,25 +100,17 @@ private static Set<Attribute> missingAttributes(PhysicalPlan p) {
102100
var missing = new LinkedHashSet<Attribute>();
103101
var input = p.inputSet();
104102

105-
// For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized
106-
if (p instanceof LookupJoinExec join) {
107-
join.leftFields().forEach(f -> {
108-
if (input.contains(f) == false) {
109-
missing.add(f);
110-
}
111-
});
112-
return missing;
113-
}
114-
115-
// collect field attributes used inside expressions
116-
// TODO: Rather than going over all expressions manually, this should just call .references()
117-
p.forEachExpression(TypedAttribute.class, f -> {
103+
// Collect field attributes referenced by this plan but not yet present in the child's output.
104+
// This is also correct for LookupJoinExec, where we only need field extraction on the left fields used to match, since the right
105+
// side is always materialized.
106+
p.references().forEach(f -> {
118107
if (f instanceof FieldAttribute || f instanceof MetadataAttribute) {
119108
if (input.contains(f) == false) {
120109
missing.add(f);
121110
}
122111
}
123112
});
113+
124114
return missing;
125115
}
126116
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -565,21 +565,12 @@ private PhysicalOperation planHashJoin(HashJoinExec join, LocalExecutionPlannerC
565565

566566
private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlannerContext context) {
567567
PhysicalOperation source = plan(join.left(), context);
568-
// TODO: The source builder includes incoming fields including the ones we're going to drop
569568
Layout.Builder layoutBuilder = source.layout.builder();
570569
for (Attribute f : join.addedFields()) {
571570
layoutBuilder.append(f);
572571
}
573572
Layout layout = layoutBuilder.build();
574573

575-
// TODO: this works when the join happens on the coordinator
576-
/*
577-
* But when it happens on the data node we get a
578-
* \_FieldExtractExec[language_code{f}#15, language_name{f}#16]<[]>
579-
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#18], limit[], sort[] estimatedRowSize[62]
580-
* Which we'd prefer not to do - at least for now. We already know the fields we're loading
581-
* and don't want any local planning.
582-
*/
583574
EsQueryExec localSourceExec = (EsQueryExec) join.lookup();
584575
if (localSourceExec.indexMode() != IndexMode.LOOKUP) {
585576
throw new IllegalArgumentException("can't plan [" + join + "]");

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,11 @@ private void preAnalyzeLookupIndices(List<TableInfo> indices, ListenerResult lis
374374
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
375375
indexResolver.resolveAsMergedMapping(
376376
table.index(),
377-
Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection
377+
Set.of("*"), // TODO: for LOOKUP JOIN, this currently declares all lookup index fields relevant and might fetch too many.
378378
null,
379379
listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution))
380380
);
381+
// TODO: Verify that the resolved index actually has indexMode: "lookup"
381382
} else {
382383
try {
383384
// No lookup indices specified

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public static Analyzer defaultAnalyzer() {
3838
}
3939

4040
public static Analyzer expandedDefaultAnalyzer() {
41-
return analyzer(analyzerExpandedDefaultMapping());
41+
return analyzer(expandedDefaultIndexResolution());
4242
}
4343

4444
public static Analyzer analyzer(IndexResolution indexResolution) {
@@ -47,18 +47,33 @@ public static Analyzer analyzer(IndexResolution indexResolution) {
4747

4848
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
4949
return new Analyzer(
50-
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()),
50+
new AnalyzerContext(
51+
EsqlTestUtils.TEST_CFG,
52+
new EsqlFunctionRegistry(),
53+
indexResolution,
54+
defaultLookupResolution(),
55+
defaultEnrichResolution()
56+
),
5157
verifier
5258
);
5359
}
5460

5561
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
56-
return new Analyzer(new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()), verifier);
62+
return new Analyzer(
63+
new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultLookupResolution(), defaultEnrichResolution()),
64+
verifier
65+
);
5766
}
5867

5968
public static Analyzer analyzer(Verifier verifier) {
6069
return new Analyzer(
61-
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), analyzerDefaultMapping(), defaultEnrichResolution()),
70+
new AnalyzerContext(
71+
EsqlTestUtils.TEST_CFG,
72+
new EsqlFunctionRegistry(),
73+
analyzerDefaultMapping(),
74+
defaultLookupResolution(),
75+
defaultEnrichResolution()
76+
),
6277
verifier
6378
);
6479
}
@@ -98,10 +113,14 @@ public static IndexResolution analyzerDefaultMapping() {
98113
return loadMapping("mapping-basic.json", "test");
99114
}
100115

101-
public static IndexResolution analyzerExpandedDefaultMapping() {
116+
public static IndexResolution expandedDefaultIndexResolution() {
102117
return loadMapping("mapping-default.json", "test");
103118
}
104119

120+
public static IndexResolution defaultLookupResolution() {
121+
return loadMapping("mapping-languages.json", "languages_lookup");
122+
}
123+
105124
public static EnrichResolution defaultEnrichResolution() {
106125
EnrichResolution enrichResolution = new EnrichResolution();
107126
loadEnrichPolicyResolution(enrichResolution, MATCH_TYPE, "languages", "language_code", "languages_idx", "mapping-languages.json");

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.index.IndexMode;
1515
import org.elasticsearch.index.analysis.IndexAnalyzers;
1616
import org.elasticsearch.test.ESTestCase;
17+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1718
import org.elasticsearch.xpack.esql.LoadMapping;
1819
import org.elasticsearch.xpack.esql.VerificationException;
1920
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
@@ -73,6 +74,8 @@
7374
import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS;
7475
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
7576
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
77+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping;
78+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultEnrichResolution;
7679
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
7780
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.tsdbIndexResolution;
7881
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
@@ -83,6 +86,7 @@
8386
import static org.hamcrest.Matchers.hasSize;
8487
import static org.hamcrest.Matchers.instanceOf;
8588
import static org.hamcrest.Matchers.matchesRegex;
89+
import static org.hamcrest.Matchers.not;
8690
import static org.hamcrest.Matchers.startsWith;
8791

8892
//@TestLogging(value = "org.elasticsearch.xpack.esql.analysis:TRACE", reason = "debug")
@@ -2002,6 +2006,58 @@ public void testLookupMatchTypeWrong() {
20022006
assertThat(e.getMessage(), containsString("column type mismatch, table column was [integer] and original column was [keyword]"));
20032007
}
20042008

2009+
public void testLookupJoinUnknownIndex() {
2010+
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
2011+
2012+
String errorMessage = "Unknown index [foobar]";
2013+
IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage);
2014+
2015+
Analyzer analyzerMissingLookupIndex = new Analyzer(
2016+
new AnalyzerContext(
2017+
EsqlTestUtils.TEST_CFG,
2018+
new EsqlFunctionRegistry(),
2019+
analyzerDefaultMapping(),
2020+
missingLookupIndex,
2021+
defaultEnrichResolution()
2022+
),
2023+
TEST_VERIFIER
2024+
);
2025+
2026+
String query = "FROM test | LOOKUP JOIN foobar ON last_name";
2027+
2028+
VerificationException e = expectThrows(VerificationException.class, () -> analyze(query, analyzerMissingLookupIndex));
2029+
assertThat(e.getMessage(), containsString("1:25: " + errorMessage));
2030+
2031+
String query2 = "FROM test | LOOKUP JOIN foobar ON missing_field";
2032+
2033+
e = expectThrows(VerificationException.class, () -> analyze(query2, analyzerMissingLookupIndex));
2034+
assertThat(e.getMessage(), containsString("1:25: " + errorMessage));
2035+
assertThat(e.getMessage(), not(containsString("[missing_field]")));
2036+
}
2037+
2038+
public void testLookupJoinUnknownField() {
2039+
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
2040+
2041+
String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name";
2042+
String errorMessage = "1:45: Unknown column [last_name] in right side of join";
2043+
2044+
VerificationException e = expectThrows(VerificationException.class, () -> analyze(query));
2045+
assertThat(e.getMessage(), containsString(errorMessage));
2046+
2047+
String query2 = "FROM test | LOOKUP JOIN languages_lookup ON language_code";
2048+
String errorMessage2 = "1:45: Unknown column [language_code] in left side of join";
2049+
2050+
e = expectThrows(VerificationException.class, () -> analyze(query2));
2051+
assertThat(e.getMessage(), containsString(errorMessage2));
2052+
2053+
String query3 = "FROM test | LOOKUP JOIN languages_lookup ON missing_altogether";
2054+
String errorMessage3 = "1:45: Unknown column [missing_altogether] in ";
2055+
2056+
e = expectThrows(VerificationException.class, () -> analyze(query3));
2057+
assertThat(e.getMessage(), containsString(errorMessage3 + "left side of join"));
2058+
assertThat(e.getMessage(), containsString(errorMessage3 + "right side of join"));
2059+
}
2060+
20052061
public void testImplicitCasting() {
20062062
var e = expectThrows(VerificationException.class, () -> analyze("""
20072063
from test | eval x = concat("2024", "-04", "-01") + 1 day

0 commit comments

Comments
 (0)