Skip to content

Commit a995a12

Browse files
Add support for Lookup Join on Multiple Fields (#131559)
Add support for Lookup Join on Multiple Fields FROM index1 | LOOKUP JOIN lookup_index on field1, field2 Removed some checks to allow lookup join on multiple fields. Added a new interface LookupEnrichQueryGenerator, that can be used to get total number of queries and queries by position. The rest of the methods from QueryGenerator are not needed by AbstractLookupService. That allowed the creation of a new class ExpressionQueryList implements LookupEnrichQueryGenerator, which is responsible for creating the AND query for the different fields. We will likely need to enhance it in the future to support expressions that include OR and NOT as well. TransportRequest is enhanced to now support List<MatchConfig> matchFields instead of String matchField. This is how we pass the match fields around now. If we are communicating with an cluster that does not support LookupOnMultipleFields and it is needed by the query we will fail the query. This can happen during rolling upgrade or CCS.
1 parent 8cfdb67 commit a995a12

File tree

52 files changed

+1313
-307
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1313
-307
lines changed

docs/changelog/131559.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
pr: 131559
2+
summary: Add support for LOOKUP JOIN on multiple fields
3+
area: ES|QL
4+
type: enhancement
5+
issues: [ ]
6+
highlight:
7+
title: Add support for Lookup Join on Multiple Fields
8+
body: "Add support for Lookup Join on Multiple Fields e.g. FROM index1\n| LOOKUP\
9+
\ JOIN lookup_index on field1, field2"
10+
notable: true

docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,22 @@ FROM <source_index>
1717
| LOOKUP JOIN <lookup_index> ON <field_name>
1818
```
1919

20+
```esql
21+
FROM <source_index>
22+
| LOOKUP JOIN <lookup_index> ON <field_name1>, <field_name2>, <field_name3>
23+
```
24+
2025
**Parameters**
2126

2227
`<lookup_index>`
2328
: The name of the lookup index. This must be a specific index name - wildcards, aliases, and remote cluster references are not supported. Indices used for lookups must be configured with the [`lookup` index mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting).
2429

25-
`<field_name>`
26-
: The field to join on. This field must exist in both your current query results and in the lookup index. If the field contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows).
30+
`<field_name>` or `<field_name1>, <field_name2>, <field_name3>`
31+
: The field(s) to join on. Can be either:
32+
* A single field name
33+
* A comma-separated list of field names {applies_to}`stack: ga 9.2`
34+
: These fields must exist in both your current query results and in the lookup index. If the fields contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows).
35+
2736

2837
**Description**
2938

@@ -32,7 +41,7 @@ results table by finding documents in a lookup index that share the same
3241
join field value as your result rows.
3342

3443
For each row in your results table that matches a document in the lookup
35-
index based on the join field, all fields from the matching document are
44+
index based on the join fields, all fields from the matching document are
3645
added as new columns to that row.
3746

3847
If multiple documents in the lookup index match a single row in your

docs/reference/query-languages/esql/esql-lookup-join.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@ For example, you can use `LOOKUP JOIN` to:
3333
The `LOOKUP JOIN` command adds fields from the lookup index as new columns to your results table based on matching values in the join field.
3434

3535
The command requires two parameters:
36-
- The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting))
37-
- The name of the field to join on
38-
36+
* The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting))
37+
* The field(s) to join on. Can be either:
38+
* A single field name
39+
* A comma-separated list of field names {applies_to}`stack: ga 9.2`
40+
3941
```esql
40-
LOOKUP JOIN <lookup_index> ON <field_name>
42+
LOOKUP JOIN <lookup_index> ON <field_name> # Join on a single field
43+
LOOKUP JOIN <lookup_index> ON <field_name1>, <field_name2>, <field_name3> # Join on multiple fields
4144
```
4245

4346
:::{image} ../images/esql-lookup-join.png
@@ -200,7 +203,7 @@ The following are the current limitations with `LOOKUP JOIN`:
200203
* Indices in [`lookup` mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting) are always single-sharded.
201204
* Cross cluster search is unsupported initially. Both source and lookup indices must be local.
202205
* Currently, only matching on equality is supported.
203-
* `LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported.
206+
* In Stack versions `9.0-9.1`,`LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported.
204207
* Aliases, datemath, and datastreams are supported, as long as the index pattern matches a single concrete index {applies_to}`stack: ga 9.1.0`.
205208
* The name of the match field in `LOOKUP JOIN lu_idx ON match_field` must match an existing field in the query. This may require `RENAME`s or `EVAL`s to achieve.
206209
* The query will circuit break if there are too many matching documents in the lookup index, or if the documents are too large. More precisely, `LOOKUP JOIN` works in batches of, normally, about 10,000 rows; a large amount of heap space is needed if the matching documents from the lookup index for a batch are multiple megabytes or larger. This is roughly the same as for `ENRICH`.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ static TransportVersion def(int id) {
363363
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
364364
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
365365
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
366+
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
366367

367368
/*
368369
* STOP! READ THIS FIRST! No, really,

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.http.client.config.RequestConfig;
1414
import org.apache.http.util.EntityUtils;
1515
import org.apache.lucene.tests.util.TimeUnits;
16+
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1617
import org.elasticsearch.client.Request;
1718
import org.elasticsearch.client.RequestOptions;
1819
import org.elasticsearch.client.Response;
@@ -697,13 +698,26 @@ private Map<String, Object> fetchMvLongs() throws IOException {
697698
public void testLookupExplosion() throws IOException {
698699
int sensorDataCount = 400;
699700
int lookupEntries = 10000;
700-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries);
701+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
701702
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
702703
}
703704

705+
public void testLookupExplosionManyFields() throws IOException {
706+
int sensorDataCount = 400;
707+
int lookupEntries = 1000;
708+
int joinFieldsCount = 990;
709+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
710+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
711+
}
712+
713+
public void testLookupExplosionManyMatchesManyFields() throws IOException {
714+
// 1500, 10000 is enough locally, but some CI machines need more.
715+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
716+
}
717+
704718
public void testLookupExplosionManyMatches() throws IOException {
705719
// 1500, 10000 is enough locally, but some CI machines need more.
706-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000));
720+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
707721
}
708722

709723
public void testLookupExplosionNoFetch() throws IOException {
@@ -730,11 +744,18 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
730744
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
731745
}
732746

733-
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException {
747+
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
734748
try {
735-
lookupExplosionData(sensorDataCount, lookupEntries);
749+
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
736750
StringBuilder query = startQuery();
737-
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(location)\"}");
751+
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON ");
752+
for (int i = 0; i < joinFieldsCount; i++) {
753+
if (i != 0) {
754+
query.append(",");
755+
}
756+
query.append("id").append(i);
757+
}
758+
query.append(" | STATS COUNT(location)\"}");
738759
return responseAsMap(query(query.toString(), null));
739760
} finally {
740761
deleteIndex("sensor_data");
@@ -744,24 +765,24 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
744765

745766
private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException {
746767
try {
747-
lookupExplosionData(sensorDataCount, lookupEntries);
768+
lookupExplosionData(sensorDataCount, lookupEntries, 1);
748769
StringBuilder query = startQuery();
749-
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}");
770+
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}");
750771
return responseAsMap(query(query.toString(), null));
751772
} finally {
752773
deleteIndex("sensor_data");
753774
deleteIndex("sensor_lookup");
754775
}
755776
}
756777

757-
private void lookupExplosionData(int sensorDataCount, int lookupEntries) throws IOException {
758-
initSensorData(sensorDataCount, 1);
759-
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484");
778+
private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount) throws IOException {
779+
initSensorData(sensorDataCount, 1, joinFieldCount);
780+
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount);
760781
}
761782

762783
private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException {
763784
try {
764-
initSensorData(sensorDataCount, 1);
785+
initSensorData(sensorDataCount, 1, 1);
765786
initSensorLookupString(lookupEntries, 1, i -> {
766787
int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes());
767788
StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes()));
@@ -772,7 +793,7 @@ private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lo
772793
return str.toString();
773794
});
774795
StringBuilder query = startQuery();
775-
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(string)\"}");
796+
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(string)\"}");
776797
return responseAsMap(query(query.toString(), null));
777798
} finally {
778799
deleteIndex("sensor_data");
@@ -794,11 +815,11 @@ public void testEnrichExplosionManyMatches() throws IOException {
794815

795816
private Map<String, Object> enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
796817
try {
797-
initSensorData(sensorDataCount, 1);
818+
initSensorData(sensorDataCount, 1, 1);
798819
initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
799820
try {
800821
StringBuilder query = startQuery();
801-
query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}");
822+
query.append("FROM sensor_data | ENRICH sensor ON id0 | STATS COUNT(*)\"}");
802823
return responseAsMap(query(query.toString(), null));
803824
} finally {
804825
Request delete = new Request("DELETE", "/_enrich/policy/sensor");
@@ -958,25 +979,42 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
958979
initIndex("mv_longs", bulk.toString());
959980
}
960981

961-
private void initSensorData(int docCount, int sensorCount) throws IOException {
982+
private void initSensorData(int docCount, int sensorCount, int joinFieldCount) throws IOException {
962983
logger.info("loading sensor data");
963-
createIndex("sensor_data", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
964-
{
965-
"properties": {
966-
"@timestamp": { "type": "date" },
967-
"id": { "type": "long" },
984+
// We cannot go over 1000 fields, due to failed on parsing mappings on index creation
985+
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
986+
assertTrue("Too many columns, it will throw an exception later", joinFieldCount <= 990);
987+
StringBuilder createIndexBuilder = new StringBuilder();
988+
createIndexBuilder.append("""
989+
{
990+
"properties": {
991+
"@timestamp": { "type": "date" },
992+
""");
993+
for (int i = 0; i < joinFieldCount; i++) {
994+
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
995+
}
996+
createIndexBuilder.append("""
968997
"value": { "type": "double" }
969998
}
970999
}""");
1000+
CreateIndexResponse response = createIndex(
1001+
"sensor_data",
1002+
Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(),
1003+
createIndexBuilder.toString()
1004+
);
1005+
assertTrue(response.isAcknowledged());
9711006
int docsPerBulk = 1000;
9721007
long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z");
9731008

9741009
StringBuilder data = new StringBuilder();
9751010
for (int i = 0; i < docCount; i++) {
9761011
data.append(String.format(Locale.ROOT, """
9771012
{"create":{}}
978-
{"timestamp":"%s", "id": %d, "value": %f}
979-
""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate), i % sensorCount, i * 1.1));
1013+
{"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate)));
1014+
for (int j = 0; j < joinFieldCount; j++) {
1015+
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, i % sensorCount));
1016+
}
1017+
data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1));
9801018
if (i % docsPerBulk == docsPerBulk - 1) {
9811019
bulk("sensor_data", data.toString());
9821020
data.setLength(0);
@@ -985,23 +1023,42 @@ private void initSensorData(int docCount, int sensorCount) throws IOException {
9851023
initIndex("sensor_data", data.toString());
9861024
}
9871025

988-
private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
1026+
private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location, int joinFieldsCount)
1027+
throws IOException {
9891028
logger.info("loading sensor lookup");
990-
createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
1029+
// cannot go over 1000 fields, due to failed on parsing mappings on index creation
1030+
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
1031+
assertTrue("Too many join on fields, it will throw an exception later", joinFieldsCount <= 990);
1032+
StringBuilder createIndexBuilder = new StringBuilder();
1033+
createIndexBuilder.append("""
9911034
{
9921035
"properties": {
993-
"id": { "type": "long" },
1036+
""");
1037+
for (int i = 0; i < joinFieldsCount; i++) {
1038+
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
1039+
}
1040+
createIndexBuilder.append("""
9941041
"location": { "type": "geo_point" }
9951042
}
9961043
}""");
1044+
CreateIndexResponse response = createIndex(
1045+
"sensor_lookup",
1046+
Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(),
1047+
createIndexBuilder.toString()
1048+
);
1049+
assertTrue(response.isAcknowledged());
9971050
int docsPerBulk = 1000;
9981051
StringBuilder data = new StringBuilder();
9991052
for (int i = 0; i < lookupEntries; i++) {
10001053
int sensor = i % sensorCount;
10011054
data.append(String.format(Locale.ROOT, """
10021055
{"create":{}}
1003-
{"id": %d, "location": "POINT(%s)"}
1004-
""", sensor, location.apply(sensor)));
1056+
{"""));
1057+
for (int j = 0; j < joinFieldsCount; j++) {
1058+
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
1059+
}
1060+
data.append(String.format(Locale.ROOT, """
1061+
"location": "POINT(%s)"}\n""", location.apply(sensor)));
10051062
if (i % docsPerBulk == docsPerBulk - 1) {
10061063
bulk("sensor_lookup", data.toString());
10071064
data.setLength(0);
@@ -1015,7 +1072,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
10151072
createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
10161073
{
10171074
"properties": {
1018-
"id": { "type": "long" },
1075+
"id0": { "type": "long" },
10191076
"string": { "type": "text" }
10201077
}
10211078
}""");
@@ -1025,7 +1082,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
10251082
int sensor = i % sensorCount;
10261083
data.append(String.format(Locale.ROOT, """
10271084
{"create":{}}
1028-
{"id": %d, "string": "%s"}
1085+
{"id0": %d, "string": "%s"}
10291086
""", sensor, string.apply(sensor)));
10301087
if (i % docsPerBulk == docsPerBulk - 1) {
10311088
bulk("sensor_lookup", data.toString());
@@ -1036,15 +1093,15 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
10361093
}
10371094

10381095
private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
1039-
initSensorLookup(lookupEntries, sensorCount, location);
1096+
initSensorLookup(lookupEntries, sensorCount, location, 1);
10401097
logger.info("loading sensor enrich");
10411098

10421099
Request create = new Request("PUT", "/_enrich/policy/sensor");
10431100
create.setJsonEntity("""
10441101
{
10451102
"match": {
10461103
"indices": "sensor_lookup",
1047-
"match_field": "id",
1104+
"match_field": "id0",
10481105
"enrich_fields": ["location"]
10491106
}
10501107
}

test/framework/src/main/java/org/elasticsearch/test/AbstractWireSerializingTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected Writeable.Writer<T> instanceWriter() {
3737
* Copy the {@link Writeable} by round tripping it through {@linkplain StreamInput} and {@linkplain StreamOutput}.
3838
*/
3939
@Override
40-
protected final T copyInstance(T instance, TransportVersion version) throws IOException {
40+
protected T copyInstance(T instance, TransportVersion version) throws IOException {
4141
return copyInstance(instance, getNamedWriteableRegistry(), instanceWriter(), instanceReader(), version);
4242
}
4343
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*/
3838
public final class EnrichQuerySourceOperator extends SourceOperator {
3939
private final BlockFactory blockFactory;
40-
private final QueryList queryList;
40+
private final LookupEnrichQueryGenerator queryList;
4141
private int queryPosition = -1;
4242
private final ShardContext shardContext;
4343
private final IndexReader indexReader;
@@ -51,7 +51,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
5151
public EnrichQuerySourceOperator(
5252
BlockFactory blockFactory,
5353
int maxPageSize,
54-
QueryList queryList,
54+
LookupEnrichQueryGenerator queryList,
5555
ShardContext shardContext,
5656
Warnings warnings
5757
) {

0 commit comments

Comments
 (0)