Skip to content

Commit 6d6a6ec

Browse files
ESQL: LOOKUP JOIN with expressions (#134098)
Add support for LOOKUP JOIN with expressions. FROM index1 | LOOKUP JOIN lookup_index on left_field1 > right_field1 AND left_field2 <= right_field2 AND left_field3 == right_field1 --handle name conflict with rename FROM index1 | RENAME id as id_left | LOOKUP JOIN lookup_index on id_left >=id AND left_field2 <= right_field2 Add support for 1 or more join predicate expressions in the ON clause of the LOOKUP JOIN. If there is more than one predicate expression, the predicate expression must be connected with the AND operator. Each predicate is in the form field1 BINARY_OPERATOR field2. One of them should be from the left and the other from the right. BINARY_OPERATOR is one of ==, >,<, >=, <=, !=. The same field can be used multiple times in the join on condition. If the fields in the join on conditions have the same name, they should be renamed so that the fields in the join on condition have unique names. This commit may include content that was generated or assisted by Cursor, Gemini CLI and/or Github Copilot.
1 parent 659b132 commit 6d6a6ec

File tree

59 files changed

+4270
-1734
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+4270
-1734
lines changed

docs/changelog/134098.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134098
2+
summary: LOOKUP JOIN with expressions
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/reference/query-languages/esql/_snippets/commands/types/lookup-join.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@
55
| field from the left index | field from the lookup index |
66
| --- | --- |
77
| boolean | boolean |
8-
| byte | half_float, float, double, scaled_float, byte, short, integer, long |
8+
| byte | byte, short, integer, long, half_float, float, double, scaled_float |
99
| date | date |
1010
| date_nanos | date_nanos |
1111
| double | half_float, float, double, scaled_float, byte, short, integer, long |
1212
| float | half_float, float, double, scaled_float, byte, short, integer, long |
1313
| half_float | half_float, float, double, scaled_float, byte, short, integer, long |
14-
| integer | half_float, float, double, scaled_float, byte, short, integer, long |
14+
| integer | byte, short, integer, long, half_float, float, double, scaled_float |
1515
| ip | ip |
1616
| keyword | keyword |
17-
| long | half_float, float, double, scaled_float, byte, short, integer, long |
17+
| long | byte, short, integer, long, half_float, float, double, scaled_float |
1818
| scaled_float | half_float, float, double, scaled_float, byte, short, integer, long |
19-
| short | half_float, float, double, scaled_float, byte, short, integer, long |
19+
| short | byte, short, integer, long, half_float, float, double, scaled_float |
2020
| text | keyword |
2121

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ static TransportVersion def(int id) {
327327
public static final TransportVersion TIMESERIES_DEFAULT_LIMIT = def(9_160_0_00);
328328
public static final TransportVersion INFERENCE_API_OPENAI_HEADERS = def(9_161_0_00);
329329
public static final TransportVersion NEW_SEMANTIC_QUERY_INTERCEPTORS = def(9_162_0_00);
330+
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_163_0_00);
330331

331332
/*
332333
* STOP! READ THIS FIRST! No, really,

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ private Response query(String query, String filterPath) throws IOException {
566566
.setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(6).millis())).build())
567567
.setWarningsHandler(WarningsHandler.PERMISSIVE)
568568
);
569+
logger.info("Running query:" + query);
569570
return runQuery(() -> client().performRequest(request));
570571
}
571572

@@ -735,28 +736,48 @@ private Map<String, Object> fetchMvLongs() throws IOException {
735736
public void testLookupExplosion() throws IOException {
736737
int sensorDataCount = 400;
737738
int lookupEntries = 10000;
738-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
739+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, false);
739740
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
740741
}
741742

742743
public void testLookupExplosionManyFields() throws IOException {
743744
int sensorDataCount = 400;
744745
int lookupEntries = 1000;
745746
int joinFieldsCount = 990;
746-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
747+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, false);
748+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
749+
}
750+
751+
public void testLookupExplosionExpression() throws IOException {
752+
int sensorDataCount = 400;
753+
int lookupEntries = 10000;
754+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, true);
755+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
756+
}
757+
758+
public void testLookupExplosionManyFieldsExpression() throws IOException {
759+
int sensorDataCount = 400;
760+
int lookupEntries = 1000;
761+
int joinFieldsCount = 399;// only join on 399 columns due to max expression size of 400
762+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, true);
747763
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
748764
}
749765

750766
public void testLookupExplosionManyMatchesManyFields() throws IOException {
751767
// 1500, 10000 is enough locally, but some CI machines need more.
752768
int lookupEntries = 10000;
753-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
769+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries, false));
754770
}
755771

756772
public void testLookupExplosionManyMatches() throws IOException {
757773
// 1500, 10000 is enough locally, but some CI machines need more.
758774
int lookupEntries = 10000;
759-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
775+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, false));
776+
}
777+
778+
public void testLookupExplosionManyMatchesExpression() throws IOException {
779+
int lookupEntries = 10000;
780+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, true));
760781
}
761782

762783
public void testLookupExplosionManyMatchesFiltered() throws IOException {
@@ -768,9 +789,21 @@ public void testLookupExplosionManyMatchesFiltered() throws IOException {
768789
int reductionFactor = 1000; // reduce the number of matches by this factor
769790
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
770791
assertTrue(0 == lookupEntries % reductionFactor);
771-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
792+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, false);
772793
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));
794+
}
773795

796+
public void testLookupExplosionManyMatchesFilteredExpression() throws IOException {
797+
// This test will only work with the expanding join optimization
798+
// that pushes the filter to the right side of the lookup.
799+
// Without the optimization, it will fail with circuit_breaking_exception
800+
int sensorDataCount = 10000;
801+
int lookupEntries = 10000;
802+
int reductionFactor = 1000; // reduce the number of matches by this factor
803+
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
804+
assertTrue(0 == lookupEntries % reductionFactor);
805+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, true);
806+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));
774807
}
775808

776809
public void testLookupExplosionNoFetch() throws IOException {
@@ -797,17 +830,33 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
797830
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
798831
}
799832

800-
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
801-
throws IOException {
833+
private Map<String, Object> lookupExplosion(
834+
int sensorDataCount,
835+
int lookupEntries,
836+
int joinFieldsCount,
837+
int lookupEntriesToKeep,
838+
boolean expressionBasedJoin
839+
) throws IOException {
802840
try {
803-
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
841+
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount, expressionBasedJoin);
804842
StringBuilder query = startQuery();
805843
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON ");
806-
for (int i = 0; i < joinFieldsCount; i++) {
807-
if (i != 0) {
808-
query.append(",");
844+
if (expressionBasedJoin) {
845+
for (int i = 0; i < joinFieldsCount; i++) {
846+
if (i != 0) {
847+
query.append(" AND ");
848+
}
849+
query.append("id_left").append(i);
850+
query.append("==");
851+
query.append("id_right").append(i);
852+
}
853+
} else {
854+
for (int i = 0; i < joinFieldsCount; i++) {
855+
if (i != 0) {
856+
query.append(",");
857+
}
858+
query.append("id").append(i);
809859
}
810-
query.append("id").append(i);
811860
}
812861
if (lookupEntries != lookupEntriesToKeep) {
813862
// add a filter to reduce the number of matches
@@ -826,7 +875,7 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
826875

827876
private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException {
828877
try {
829-
lookupExplosionData(sensorDataCount, lookupEntries, 1);
878+
lookupExplosionData(sensorDataCount, lookupEntries, 1, false);
830879
StringBuilder query = startQuery();
831880
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}");
832881
return responseAsMap(query(query.toString(), null));
@@ -836,14 +885,15 @@ private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int look
836885
}
837886
}
838887

839-
private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount) throws IOException {
840-
initSensorData(sensorDataCount, 1, joinFieldCount);
841-
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount);
888+
private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount, boolean expressionBasedJoin)
889+
throws IOException {
890+
initSensorData(sensorDataCount, 1, joinFieldCount, expressionBasedJoin);
891+
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount, expressionBasedJoin);
842892
}
843893

844894
private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException {
845895
try {
846-
initSensorData(sensorDataCount, 1, 1);
896+
initSensorData(sensorDataCount, 1, 1, false);
847897
initSensorLookupString(lookupEntries, 1, i -> {
848898
int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes());
849899
StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes()));
@@ -876,7 +926,7 @@ public void testEnrichExplosionManyMatches() throws IOException {
876926

877927
private Map<String, Object> enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
878928
try {
879-
initSensorData(sensorDataCount, 1, 1);
929+
initSensorData(sensorDataCount, 1, 1, false);
880930
initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
881931
try {
882932
StringBuilder query = startQuery();
@@ -1050,7 +1100,7 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
10501100
initIndex("mv_longs", bulk.toString());
10511101
}
10521102

1053-
private void initSensorData(int docCount, int sensorCount, int joinFieldCount) throws IOException {
1103+
private void initSensorData(int docCount, int sensorCount, int joinFieldCount, boolean expressionBasedJoin) throws IOException {
10541104
logger.info("loading sensor data");
10551105
// We cannot go over 1000 fields, due to failed on parsing mappings on index creation
10561106
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
@@ -1061,8 +1111,9 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
10611111
"properties": {
10621112
"@timestamp": { "type": "date" },
10631113
""");
1114+
String suffix = expressionBasedJoin ? "_left" : "";
10641115
for (int i = 0; i < joinFieldCount; i++) {
1065-
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
1116+
createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },");
10661117
}
10671118
createIndexBuilder.append("""
10681119
"value": { "type": "double" }
@@ -1083,7 +1134,7 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
10831134
{"create":{}}
10841135
{"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate)));
10851136
for (int j = 0; j < joinFieldCount; j++) {
1086-
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, i % sensorCount));
1137+
data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, i % sensorCount));
10871138
}
10881139
data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1));
10891140
if (i % docsPerBulk == docsPerBulk - 1) {
@@ -1094,8 +1145,13 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
10941145
initIndex("sensor_data", data.toString());
10951146
}
10961147

1097-
private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location, int joinFieldsCount)
1098-
throws IOException {
1148+
private void initSensorLookup(
1149+
int lookupEntries,
1150+
int sensorCount,
1151+
IntFunction<String> location,
1152+
int joinFieldsCount,
1153+
boolean expressionBasedJoin
1154+
) throws IOException {
10991155
logger.info("loading sensor lookup");
11001156
// cannot go over 1000 fields, due to failed on parsing mappings on index creation
11011157
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
@@ -1105,8 +1161,9 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
11051161
{
11061162
"properties": {
11071163
""");
1164+
String suffix = expressionBasedJoin ? "_right" : "";
11081165
for (int i = 0; i < joinFieldsCount; i++) {
1109-
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
1166+
createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },");
11101167
}
11111168
createIndexBuilder.append("""
11121169
"location": { "type": "geo_point" },
@@ -1127,7 +1184,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
11271184
{"create":{}}
11281185
{"""));
11291186
for (int j = 0; j < joinFieldsCount; j++) {
1130-
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
1187+
data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, sensor));
11311188
}
11321189
data.append(String.format(Locale.ROOT, """
11331190
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
@@ -1165,7 +1222,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
11651222
}
11661223

11671224
private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
1168-
initSensorLookup(lookupEntries, sensorCount, location, 1);
1225+
initSensorLookup(lookupEntries, sensorCount, location, 1, false);
11691226
logger.info("loading sensor enrich");
11701227

11711228
Request create = new Request("PUT", "/_enrich/policy/sensor");

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public final Query getQuery(int position) {
125125
* Returns the query at the given position.
126126
*/
127127
@Nullable
128-
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
128+
public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
129129

130130
private Query wrapSingleValueQuery(Query query) {
131131
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";
@@ -155,17 +155,11 @@ private Query wrapSingleValueQuery(Query query) {
155155
}
156156

157157
/**
158-
* Returns a list of term queries for the given field and the input block
159-
* using only the {@link ElementType} of the {@link Block} to determine the
160-
* query.
158+
* Returns a function that reads values from the given block. The function
159+
* takes the offset of the value to read and returns the value as an {@link Object}.
161160
*/
162-
public static QueryList rawTermQueryList(
163-
MappedFieldType field,
164-
SearchExecutionContext searchExecutionContext,
165-
AliasFilter aliasFilter,
166-
Block block
167-
) {
168-
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
161+
public static IntFunction<Object> createBlockValueReader(Block block) {
162+
return switch (block.elementType()) {
169163
case BOOLEAN -> {
170164
BooleanBlock booleanBlock = (BooleanBlock) block;
171165
yield booleanBlock::getBoolean;
@@ -196,7 +190,20 @@ public static QueryList rawTermQueryList(
196190
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
197191
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
198192
};
199-
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
193+
}
194+
195+
/**
196+
* Returns a list of term queries for the given field and the input block
197+
* using only the {@link ElementType} of the {@link Block} to determine the
198+
* query.
199+
*/
200+
public static QueryList rawTermQueryList(
201+
MappedFieldType field,
202+
SearchExecutionContext searchExecutionContext,
203+
AliasFilter aliasFilter,
204+
Block block
205+
) {
206+
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block));
200207
}
201208

202209
/**
@@ -297,7 +304,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
297304
}
298305

299306
@Override
300-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
307+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
301308
return switch (valueCount) {
302309
case 0 -> null;
303310
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
@@ -360,7 +367,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW
360367
}
361368

362369
@Override
363-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
370+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
364371
return switch (valueCount) {
365372
case 0 -> null;
366373
case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
@@ -412,7 +419,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
412419
}
413420

414421
@Override
415-
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
422+
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
416423
return switch (valueCount) {
417424
case 0 -> null;
418425
case 1 -> shapeQuery.apply(firstValueIndex);
@@ -453,5 +460,5 @@ private IntFunction<Query> shapeQuery() {
453460
}
454461
}
455462

456-
protected record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
463+
public record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
457464
}

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ protected ByteSizeValue enoughMemoryForSimple() {
9797
* asserting both that this throws a {@link CircuitBreakingException} and releases
9898
* all pages.
9999
*/
100-
public final void testSimpleCircuitBreaking() {
100+
public void testSimpleCircuitBreaking() {
101101
/*
102102
* Build the input before building `simple` to handle the rare
103103
* cases where `simple` need some state from the input - mostly

0 commit comments

Comments
 (0)