Skip to content

Commit ebfd896

Browse files
WIP POC for expression join
1 parent 242455f commit ebfd896

37 files changed

+881
-99
lines changed

.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ static TransportVersion def(int id) {
358358
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
359359
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
360360
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_147_0_00);
361+
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_148_0_00);
361362

362363
/*
363364
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public final Query getQuery(int position) {
125125
* Returns the query at the given position.
126126
*/
127127
@Nullable
128-
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
128+
public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
129129

130130
private Query wrapSingleValueQuery(Query query) {
131131
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";
@@ -159,13 +159,8 @@ private Query wrapSingleValueQuery(Query query) {
159159
* using only the {@link ElementType} of the {@link Block} to determine the
160160
* query.
161161
*/
162-
public static QueryList rawTermQueryList(
163-
MappedFieldType field,
164-
SearchExecutionContext searchExecutionContext,
165-
AliasFilter aliasFilter,
166-
Block block
167-
) {
168-
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
162+
public static IntFunction<Object> createBlockValueReader(Block block) {
163+
return switch (block.elementType()) {
169164
case BOOLEAN -> {
170165
BooleanBlock booleanBlock = (BooleanBlock) block;
171166
yield booleanBlock::getBoolean;
@@ -196,7 +191,20 @@ public static QueryList rawTermQueryList(
196191
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
197192
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
198193
};
199-
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
194+
}
195+
196+
/**
197+
* Returns a list of term queries for the given field and the input block
198+
* using only the {@link ElementType} of the {@link Block} to determine the
199+
* query.
200+
*/
201+
public static QueryList rawTermQueryList(
202+
MappedFieldType field,
203+
SearchExecutionContext searchExecutionContext,
204+
AliasFilter aliasFilter,
205+
Block block
206+
) {
207+
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block));
200208
}
201209

202210
/**
@@ -297,7 +305,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
297305
}
298306

299307
@Override
300-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
308+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
301309
return switch (valueCount) {
302310
case 0 -> null;
303311
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
@@ -360,7 +368,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW
360368
}
361369

362370
@Override
363-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
371+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
364372
return switch (valueCount) {
365373
case 0 -> null;
366374
case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
@@ -412,7 +420,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
412420
}
413421

414422
@Override
415-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
423+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
416424
return switch (valueCount) {
417425
case 0 -> null;
418426
case 1 -> shapeQuery.apply(firstValueIndex);

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2006,6 +2006,100 @@ language_code_float:integer | language_name:keyword
20062006
127 | max_byte
20072007
;
20082008

2009+
lookupWithExpressionEquals
2010+
required_capability: join_lookup_v12
2011+
required_capability: lookup_join_on_boolean_expression
2012+
2013+
FROM employees
2014+
| WHERE emp_no == 10001
2015+
| LOOKUP JOIN languages_lookup ON languages == language_code
2016+
| KEEP emp_no, languages, language_code, language_name
2017+
;
2018+
2019+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
2020+
10001 | 2 | 2 | French
2021+
;
2022+
2023+
lookupWithExpressionNotEquals
2024+
required_capability: join_lookup_v12
2025+
required_capability: lookup_join_on_boolean_expression
2026+
2027+
FROM employees
2028+
| WHERE emp_no == 10001
2029+
| LOOKUP JOIN languages_lookup ON languages != language_code
2030+
| KEEP emp_no, languages, language_code, language_name
2031+
| SORT language_code
2032+
;
2033+
2034+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
2035+
10001 | 2 | 1 | English
2036+
10001 | 2 | 3 | Spanish
2037+
10001 | 2 | 4 | German
2038+
;
2039+
2040+
lookupWithExpressionGreater
2041+
required_capability: join_lookup_v12
2042+
required_capability: lookup_join_on_boolean_expression
2043+
2044+
FROM employees
2045+
| WHERE emp_no == 10001
2046+
| LOOKUP JOIN languages_lookup ON languages > language_code
2047+
| KEEP emp_no, languages, language_code, language_name
2048+
;
2049+
2050+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
2051+
10001 | 2 | 1 | English
2052+
;
2053+
2054+
lookupWithExpressionGreaterOrEquals
2055+
required_capability: join_lookup_v12
2056+
required_capability: lookup_join_on_boolean_expression
2057+
2058+
FROM employees
2059+
| WHERE emp_no == 10001
2060+
| LOOKUP JOIN languages_lookup ON languages >= language_code
2061+
| KEEP emp_no, languages, language_code, language_name
2062+
| SORT language_code
2063+
;
2064+
2065+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
2066+
10001 | 2 | 1 | English
2067+
10001 | 2 | 2 | French
2068+
;
2069+
2070+
lookupWithExpressionLess
2071+
required_capability: join_lookup_v12
2072+
required_capability: lookup_join_on_boolean_expression
2073+
2074+
FROM employees
2075+
| WHERE emp_no == 10001
2076+
| LOOKUP JOIN languages_lookup ON languages < language_code
2077+
| KEEP emp_no, languages, language_code, language_name
2078+
| SORT language_code
2079+
;
2080+
2081+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
2082+
10001 | 2 | 3 | Spanish
2083+
10001 | 2 | 4 | German
2084+
;
2085+
2086+
lookupWithExpressionLessOrEquals
2087+
required_capability: join_lookup_v12
2088+
required_capability: lookup_join_on_boolean_expression
2089+
2090+
FROM employees
2091+
| WHERE emp_no == 10001
2092+
| LOOKUP JOIN languages_lookup ON languages <= language_code
2093+
| KEEP emp_no, languages, language_code, language_name
2094+
| SORT language_code
2095+
;
2096+
2097+
emp_no:integer | languages:integer | language_code:integer | language_name:keyword
2098+
10001 | 2 | 2 | French
2099+
10001 | 2 | 3 | Spanish
2100+
10001 | 2 | 4 | German
2101+
;
2102+
20092103
byteJoinDouble
20102104
required_capability: join_lookup_v12
20112105
required_capability: lookup_join_on_mixed_numeric_fields

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1399,7 +1399,12 @@ public enum Cap {
13991399
/**
14001400
* Allow qualifiers in attribute names.
14011401
*/
1402-
NAME_QUALIFIERS(Build.current().isSnapshot());
1402+
NAME_QUALIFIERS(Build.current().isSnapshot()),
1403+
1404+
/**
1405+
* Allow lookup join on boolean expressions
1406+
*/
1407+
LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION;
14031408

14041409
private final boolean enabled;
14051410

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@
8383
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToUnsignedLong;
8484
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
8585
import org.elasticsearch.xpack.esql.expression.function.vector.VectorFunction;
86+
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
8687
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.DateTimeArithmeticOperation;
8788
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.EsqlArithmeticOperation;
89+
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison;
8890
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
8991
import org.elasticsearch.xpack.esql.index.EsIndex;
9092
import org.elasticsearch.xpack.esql.index.IndexResolution;
@@ -441,7 +443,14 @@ protected LogicalPlan rule(Lookup lookup, AnalyzerContext context) {
441443
// postpone the resolution for ResolveRefs
442444
}
443445

444-
return new Lookup(source, lookup.child(), tableNameExpression, lookup.matchFields(), localRelation);
446+
return new Lookup(
447+
source,
448+
lookup.child(),
449+
tableNameExpression,
450+
lookup.matchFields(),
451+
localRelation,
452+
lookup.getJoinOnConditions()
453+
);
445454
}
446455

447456
private LocalRelation tableMapAsRelation(Source source, Map<String, Column> mapTable) {
@@ -707,15 +716,30 @@ private LogicalPlan resolveLookup(Lookup l, List<Attribute> childrenOutput) {
707716
matchFields.add(matchFieldChildReference);
708717
}
709718
if (modified) {
710-
return new Lookup(l.source(), l.child(), l.tableName(), matchFields, l.localRelation());
719+
return new Lookup(l.source(), l.child(), l.tableName(), matchFields, l.localRelation(), l.getJoinOnConditions());
711720
}
712721
return l;
713722
}
714723

724+
private List<Expression> resolveJoinFilters(List<Expression> filters, List<Attribute> leftOutput, List<Attribute> rightOutput) {
725+
if (filters.isEmpty()) {
726+
return emptyList();
727+
}
728+
List<Attribute> childrenOutput = new ArrayList<>(leftOutput);
729+
childrenOutput.addAll(rightOutput);
730+
731+
List<Expression> resolvedFilters = new ArrayList<>(filters.size());
732+
for (Expression filter : filters) {
733+
resolvedFilters.add(filter.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)));
734+
}
735+
return resolvedFilters;
736+
}
737+
715738
private Join resolveLookupJoin(LookupJoin join) {
716739
JoinConfig config = join.config();
717740
// for now, support only (LEFT) USING clauses
718741
JoinType type = config.type();
742+
719743
// rewrite the join into an equi-join between the field with the same name between left and right
720744
if (type instanceof UsingJoinType using) {
721745
List<Attribute> cols = using.columns();
@@ -733,22 +757,48 @@ private Join resolveLookupJoin(LookupJoin join) {
733757
name,
734758
"Only LEFT join is supported with USING"
735759
);
736-
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList()));
760+
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList(), null));
761+
}
762+
List<Attribute> leftKeys = new ArrayList<>();
763+
List<Attribute> rightKeys = new ArrayList<>();
764+
List<Expression> resolvedFilters = new ArrayList<>();
765+
List<Attribute> matchKeys;
766+
if (join.config().joinOnConditions() != null) {
767+
resolvedFilters = resolveJoinFilters(
768+
Predicates.splitAnd(join.config().joinOnConditions()),
769+
join.left().output(),
770+
join.right().output()
771+
);
772+
// build leftKeys and rightKeys using the left side of the resolvedFilters.
773+
for (Expression expression : resolvedFilters) {
774+
if (expression instanceof EsqlBinaryComparison binaryComparison) {
775+
leftKeys.add((Attribute) binaryComparison.left());
776+
rightKeys.add((Attribute) binaryComparison.right());
777+
} else {
778+
throw new EsqlIllegalArgumentException("Unsupported join filter expression: " + expression);
779+
}
780+
}
781+
Set<Attribute> matchKeysSet = new HashSet<>(leftKeys);
782+
matchKeysSet.addAll(rightKeys);
783+
matchKeys = new ArrayList<>(matchKeysSet);
784+
} else {
785+
// resolve the using columns against the left and the right side then assemble the new join config
786+
leftKeys = resolveUsingColumns(cols, join.left().output(), "left");
787+
rightKeys = resolveUsingColumns(cols, join.right().output(), "right");
788+
matchKeys = leftKeys;
737789
}
738-
// resolve the using columns against the left and the right side then assemble the new join config
739-
List<Attribute> leftKeys = resolveUsingColumns(cols, join.left().output(), "left");
740-
List<Attribute> rightKeys = resolveUsingColumns(cols, join.right().output(), "right");
741790

742-
config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys);
743-
join = new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote());
791+
config = new JoinConfig(coreJoin, matchKeys, leftKeys, rightKeys, Predicates.combineAnd(resolvedFilters));
792+
return new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote());
744793
} else if (type != JoinTypes.LEFT) {
745794
// everything else is unsupported for now
746795
// LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run
747796
// more than once.
748797
UnresolvedAttribute errorAttribute = new UnresolvedAttribute(join.source(), "unsupported", "Unsupported join type");
749798
// add error message
750-
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList()));
799+
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList(), null));
751800
}
801+
752802
return join;
753803
}
754804

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,10 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
558558
this.source = source;
559559
}
560560

561+
public Page getInputPage() {
562+
return inputPage;
563+
}
564+
561565
@Override
562566
public final String[] indices() {
563567
return new String[] { indexPattern };

0 commit comments

Comments
 (0)