Skip to content

Commit 34e8d83

Browse files
WIP POC for expression join
1 parent 242455f commit 34e8d83

File tree

38 files changed

+896
-100
lines changed

38 files changed

+896
-100
lines changed

.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ static TransportVersion def(int id) {
358358
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
359359
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
360360
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_147_0_00);
361+
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_148_0_00);
361362

362363
/*
363364
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public final Query getQuery(int position) {
125125
* Returns the query at the given position.
126126
*/
127127
@Nullable
128-
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
128+
public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
129129

130130
private Query wrapSingleValueQuery(Query query) {
131131
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";
@@ -159,13 +159,8 @@ private Query wrapSingleValueQuery(Query query) {
159159
* using only the {@link ElementType} of the {@link Block} to determine the
160160
* query.
161161
*/
162-
public static QueryList rawTermQueryList(
163-
MappedFieldType field,
164-
SearchExecutionContext searchExecutionContext,
165-
AliasFilter aliasFilter,
166-
Block block
167-
) {
168-
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
162+
public static IntFunction<Object> createBlockValueReader(Block block) {
163+
return switch (block.elementType()) {
169164
case BOOLEAN -> {
170165
BooleanBlock booleanBlock = (BooleanBlock) block;
171166
yield booleanBlock::getBoolean;
@@ -196,7 +191,20 @@ public static QueryList rawTermQueryList(
196191
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
197192
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
198193
};
199-
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
194+
}
195+
196+
/**
197+
* Returns a list of term queries for the given field and the input block
198+
* using only the {@link ElementType} of the {@link Block} to determine the
199+
* query.
200+
*/
201+
public static QueryList rawTermQueryList(
202+
MappedFieldType field,
203+
SearchExecutionContext searchExecutionContext,
204+
AliasFilter aliasFilter,
205+
Block block
206+
) {
207+
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block));
200208
}
201209

202210
/**
@@ -297,7 +305,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
297305
}
298306

299307
@Override
300-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
308+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
301309
return switch (valueCount) {
302310
case 0 -> null;
303311
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
@@ -360,7 +368,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW
360368
}
361369

362370
@Override
363-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
371+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
364372
return switch (valueCount) {
365373
case 0 -> null;
366374
case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
@@ -412,7 +420,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
412420
}
413421

414422
@Override
415-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
423+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
416424
return switch (valueCount) {
417425
case 0 -> null;
418426
case 1 -> shapeQuery.apply(firstValueIndex);
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
//
2+
// CSV spec for LOOKUP JOIN command with expression join
3+
//
4+
5+
lookupWithExpressionEquals
6+
required_capability: join_lookup_v12
7+
required_capability: lookup_join_on_boolean_expression
8+
9+
FROM employees
10+
| WHERE emp_no == 10001
11+
| LOOKUP JOIN languages_lookup ON languages == language_code
12+
| KEEP emp_no, languages, language_code, language_name
13+
;
14+
15+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
16+
10001 | 2 | 2 | French
17+
;
18+
19+
lookupWithExpressionNotEquals
20+
required_capability: join_lookup_v12
21+
required_capability: lookup_join_on_boolean_expression
22+
23+
FROM employees
24+
| WHERE emp_no == 10001
25+
| LOOKUP JOIN languages_lookup ON languages != language_code
26+
| KEEP emp_no, languages, language_code, language_name
27+
| SORT language_code
28+
;
29+
30+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
31+
10001 | 2 | 1 | English
32+
10001 | 2 | 3 | Spanish
33+
10001 | 2 | 4 | German
34+
;
35+
36+
lookupWithExpressionGreater
37+
required_capability: join_lookup_v12
38+
required_capability: lookup_join_on_boolean_expression
39+
40+
FROM employees
41+
| WHERE emp_no == 10001
42+
| LOOKUP JOIN languages_lookup ON languages > language_code
43+
| KEEP emp_no, languages, language_code, language_name
44+
;
45+
46+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
47+
10001 | 2 | 1 | English
48+
;
49+
50+
lookupWithExpressionGreaterOrEquals
51+
required_capability: join_lookup_v12
52+
required_capability: lookup_join_on_boolean_expression
53+
54+
FROM employees
55+
| WHERE emp_no == 10001
56+
| LOOKUP JOIN languages_lookup ON languages >= language_code
57+
| KEEP emp_no, languages, language_code, language_name
58+
| SORT language_code
59+
;
60+
61+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
62+
10001 | 2 | 1 | English
63+
10001 | 2 | 2 | French
64+
;
65+
66+
lookupWithExpressionLess
67+
required_capability: join_lookup_v12
68+
required_capability: lookup_join_on_boolean_expression
69+
70+
FROM employees
71+
| WHERE emp_no == 10001
72+
| LOOKUP JOIN languages_lookup ON languages < language_code
73+
| KEEP emp_no, languages, language_code, language_name
74+
| SORT language_code
75+
;
76+
77+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
78+
10001 | 2 | 3 | Spanish
79+
10001 | 2 | 4 | German
80+
;
81+
82+
lookupWithExpressionLessOrEquals
83+
required_capability: join_lookup_v12
84+
required_capability: lookup_join_on_boolean_expression
85+
86+
FROM employees
87+
| WHERE emp_no == 10001
88+
| LOOKUP JOIN languages_lookup ON languages <= language_code
89+
| KEEP emp_no, languages, language_code, language_name
90+
| SORT language_code
91+
;
92+
93+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
94+
10001 | 2 | 2 | French
95+
10001 | 2 | 3 | Spanish
96+
10001 | 2 | 4 | German
97+
;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,8 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
385385
"lookup",
386386
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
387387
Source.EMPTY,
388-
filters
388+
filters,
389+
null
389390
);
390391
DriverContext driverContext = driverContext();
391392
try (

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1399,7 +1399,12 @@ public enum Cap {
13991399
/**
14001400
* Allow qualifiers in attribute names.
14011401
*/
1402-
NAME_QUALIFIERS(Build.current().isSnapshot());
1402+
NAME_QUALIFIERS(Build.current().isSnapshot()),
1403+
1404+
/**
1405+
* Allow lookup join on boolean expressions
1406+
*/
1407+
LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION;
14031408

14041409
private final boolean enabled;
14051410

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@
8383
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToUnsignedLong;
8484
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
8585
import org.elasticsearch.xpack.esql.expression.function.vector.VectorFunction;
86+
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
8687
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.DateTimeArithmeticOperation;
8788
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.EsqlArithmeticOperation;
89+
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison;
8890
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
8991
import org.elasticsearch.xpack.esql.index.EsIndex;
9092
import org.elasticsearch.xpack.esql.index.IndexResolution;
@@ -441,7 +443,14 @@ protected LogicalPlan rule(Lookup lookup, AnalyzerContext context) {
441443
// postpone the resolution for ResolveRefs
442444
}
443445

444-
return new Lookup(source, lookup.child(), tableNameExpression, lookup.matchFields(), localRelation);
446+
return new Lookup(
447+
source,
448+
lookup.child(),
449+
tableNameExpression,
450+
lookup.matchFields(),
451+
localRelation,
452+
lookup.getJoinOnConditions()
453+
);
445454
}
446455

447456
private LocalRelation tableMapAsRelation(Source source, Map<String, Column> mapTable) {
@@ -707,15 +716,30 @@ private LogicalPlan resolveLookup(Lookup l, List<Attribute> childrenOutput) {
707716
matchFields.add(matchFieldChildReference);
708717
}
709718
if (modified) {
710-
return new Lookup(l.source(), l.child(), l.tableName(), matchFields, l.localRelation());
719+
return new Lookup(l.source(), l.child(), l.tableName(), matchFields, l.localRelation(), l.getJoinOnConditions());
711720
}
712721
return l;
713722
}
714723

724+
private List<Expression> resolveJoinFilters(List<Expression> filters, List<Attribute> leftOutput, List<Attribute> rightOutput) {
725+
if (filters.isEmpty()) {
726+
return emptyList();
727+
}
728+
List<Attribute> childrenOutput = new ArrayList<>(leftOutput);
729+
childrenOutput.addAll(rightOutput);
730+
731+
List<Expression> resolvedFilters = new ArrayList<>(filters.size());
732+
for (Expression filter : filters) {
733+
resolvedFilters.add(filter.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)));
734+
}
735+
return resolvedFilters;
736+
}
737+
715738
private Join resolveLookupJoin(LookupJoin join) {
716739
JoinConfig config = join.config();
717740
// for now, support only (LEFT) USING clauses
718741
JoinType type = config.type();
742+
719743
// rewrite the join into an equi-join between the field with the same name between left and right
720744
if (type instanceof UsingJoinType using) {
721745
List<Attribute> cols = using.columns();
@@ -733,22 +757,48 @@ private Join resolveLookupJoin(LookupJoin join) {
733757
name,
734758
"Only LEFT join is supported with USING"
735759
);
736-
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList()));
760+
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList(), null));
761+
}
762+
List<Attribute> leftKeys = new ArrayList<>();
763+
List<Attribute> rightKeys = new ArrayList<>();
764+
List<Expression> resolvedFilters = new ArrayList<>();
765+
List<Attribute> matchKeys;
766+
if (join.config().joinOnConditions() != null) {
767+
resolvedFilters = resolveJoinFilters(
768+
Predicates.splitAnd(join.config().joinOnConditions()),
769+
join.left().output(),
770+
join.right().output()
771+
);
772+
// build leftKeys and rightKeys using the left side of the resolvedFilters.
773+
for (Expression expression : resolvedFilters) {
774+
if (expression instanceof EsqlBinaryComparison binaryComparison) {
775+
leftKeys.add((Attribute) binaryComparison.left());
776+
rightKeys.add((Attribute) binaryComparison.right());
777+
} else {
778+
throw new EsqlIllegalArgumentException("Unsupported join filter expression: " + expression);
779+
}
780+
}
781+
Set<Attribute> matchKeysSet = new HashSet<>(leftKeys);
782+
matchKeysSet.addAll(rightKeys);
783+
matchKeys = new ArrayList<>(matchKeysSet);
784+
} else {
785+
// resolve the using columns against the left and the right side then assemble the new join config
786+
leftKeys = resolveUsingColumns(cols, join.left().output(), "left");
787+
rightKeys = resolveUsingColumns(cols, join.right().output(), "right");
788+
matchKeys = leftKeys;
737789
}
738-
// resolve the using columns against the left and the right side then assemble the new join config
739-
List<Attribute> leftKeys = resolveUsingColumns(cols, join.left().output(), "left");
740-
List<Attribute> rightKeys = resolveUsingColumns(cols, join.right().output(), "right");
741790

742-
config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys);
743-
join = new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote());
791+
config = new JoinConfig(coreJoin, matchKeys, leftKeys, rightKeys, Predicates.combineAnd(resolvedFilters));
792+
return new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote());
744793
} else if (type != JoinTypes.LEFT) {
745794
// everything else is unsupported for now
746795
// LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run
747796
// more than once.
748797
UnresolvedAttribute errorAttribute = new UnresolvedAttribute(join.source(), "unsupported", "Unsupported join type");
749798
// add error message
750-
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList()));
799+
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList(), null));
751800
}
801+
752802
return join;
753803
}
754804

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,10 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
558558
this.source = source;
559559
}
560560

561+
public Page getInputPage() {
562+
return inputPage;
563+
}
564+
561565
@Override
562566
public final String[] indices() {
563567
return new String[] { indexPattern };

0 commit comments

Comments
 (0)