Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b1ee80e
Lookup Join on Multiple Columns POC WIP
julian-elastic Jul 18, 2025
2c90817
Update docs/changelog/131559.yaml
julian-elastic Jul 18, 2025
4db37fd
Looking join on multiple fields WIP
julian-elastic Jul 21, 2025
ef894f3
Merge branch 'main' into lookupJoin
julian-elastic Jul 21, 2025
e367e8c
Fix more UTs
julian-elastic Jul 21, 2025
744479f
Bugfixes
julian-elastic Jul 21, 2025
c17c993
Bugfixes
julian-elastic Jul 21, 2025
d9462cc
Fix serialization error
julian-elastic Jul 21, 2025
a17fa88
Fix UT error
julian-elastic Jul 22, 2025
3fd48e4
Add more test datasets
julian-elastic Jul 22, 2025
64a07c7
Add UTs for join on 2,3,4 columns
julian-elastic Jul 22, 2025
ea171b1
Merge branch 'main' into lookupJoin
julian-elastic Jul 22, 2025
bae7007
Add handling for remote not supporting LOOKUP JOIN on multiple fields
julian-elastic Jul 23, 2025
6bd2937
Merge branch 'main' into lookupJoin
julian-elastic Jul 23, 2025
71adaa8
Change documentation
julian-elastic Jul 23, 2025
43aa7e1
Fix docs
julian-elastic Jul 24, 2025
7e0d8d7
Add more UTs
julian-elastic Jul 25, 2025
b6c615c
Merge branch 'main' into lookupJoin
julian-elastic Jul 25, 2025
5d9f68f
Address code review feedback
julian-elastic Jul 29, 2025
fc1c63b
Merge branch 'main' into lookupJoin
julian-elastic Jul 29, 2025
c179cea
Add Generative tests for Lookup Join On Multiple Columns
julian-elastic Jul 29, 2025
be2ce94
Merge branch 'main' into lookupJoin
julian-elastic Jul 29, 2025
59c16d9
Remove debugging code
julian-elastic Jul 29, 2025
dd52c02
Address a rare issue in Generative tests
julian-elastic Jul 30, 2025
8b2594b
Address docs issues
julian-elastic Jul 30, 2025
606c099
Merge branch 'main' into lookupJoin
julian-elastic Jul 30, 2025
e585342
Mode docs changes
julian-elastic Jul 30, 2025
72c3ad7
Merge branch 'main' into lookupJoin
julian-elastic Jul 30, 2025
f742160
Address code review feedback
julian-elastic Jul 30, 2025
ed6946b
Enhance LookupFromIndexIT
julian-elastic Jul 30, 2025
806933f
Fix failing UT
julian-elastic Jul 31, 2025
62956af
Merge branch 'main' into lookupJoin
julian-elastic Jul 31, 2025
326cb82
Address more code review comments
julian-elastic Jul 31, 2025
1dbe524
Address more code review comments, part 2
julian-elastic Jul 31, 2025
00b41ed
MatchConfig refactoring and add serialization test
julian-elastic Jul 31, 2025
a036aa6
bugfix
julian-elastic Jul 31, 2025
28d0c7c
Merge branch 'main' into lookupJoin
julian-elastic Jul 31, 2025
ce73957
Add HeapAttackIT cases with join on multiple fields
julian-elastic Aug 1, 2025
cfdb440
Merge branch 'main' into lookupJoin
julian-elastic Aug 1, 2025
96a6891
bugfix
julian-elastic Aug 1, 2025
d90799d
Update docs/changelog/131559.yaml
julian-elastic Aug 12, 2025
ec7f10a
Merge branch 'main' into lookupJoin
julian-elastic Aug 12, 2025
1cf34bc
Address code review comments
julian-elastic Aug 12, 2025
6acb8ef
Merge branch 'main' into lookupJoin
julian-elastic Aug 13, 2025
1102c1b
fix issue with docs
julian-elastic Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/changelog/131559.yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to declare this change as notable. I'm not sure what the bar is for that @leemthompo ?

Copy link
Contributor

@leemthompo leemthompo Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you and @tylerperk agree sounds like this passes the notable bar 😄 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leemthompo How do I declare this change as notable? Can you point me to an example? Or you add it to some other release notes list after it is merged?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alex-spies by notable you mean adding the release highlight label I guess?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leemthompo I saw that we flag important release notes as notable, like here. Is this normally added via the release highlight label?

Both seem to make sense, so I'll go and mark this as release highlight and see what the bot does :)

Copy link
Contributor

@leemthompo leemthompo Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flag important release notes as notable

TIL 😄

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pr: 131559
summary: Add support for LOOKUP JOIN on multiple fields
area: ES|QL
type: enhancement
issues: [ ]
highlight:
title: Add support for Lookup Join on Multiple Fields
body: "Add support for Lookup Join on Multiple Fields e.g. FROM index1\n| LOOKUP\
\ JOIN lookup_index on field1, field2"
notable: true
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@ FROM <source_index>
| LOOKUP JOIN <lookup_index> ON <field_name>
```

```esql
FROM <source_index>
| LOOKUP JOIN <lookup_index> ON <field_name1>, <field_name2>, <field_name3>
```

**Parameters**

`<lookup_index>`
: The name of the lookup index. This must be a specific index name - wildcards, aliases, and remote cluster references are not supported. Indices used for lookups must be configured with the [`lookup` index mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting).

`<field_name>`
: The field to join on. This field must exist in both your current query results and in the lookup index. If the field contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows).
`<field_name>` or `<field_name1>, <field_name2>, <field_name3>`
: The field(s) to join on. Can be either:
* A single field name
* A comma-separated list of field names {applies_to}`stack: ga 9.2`
: These fields must exist in both your current query results and in the lookup index. If the fields contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows).


**Description**

Expand All @@ -32,7 +41,7 @@ results table by finding documents in a lookup index that share the same
join field value as your result rows.

For each row in your results table that matches a document in the lookup
index based on the join field, all fields from the matching document are
index based on the join fields, all fields from the matching document are
added as new columns to that row.

If multiple documents in the lookup index match a single row in your
Expand Down
13 changes: 8 additions & 5 deletions docs/reference/query-languages/esql/esql-lookup-join.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ For example, you can use `LOOKUP JOIN` to:
The `LOOKUP JOIN` command adds fields from the lookup index as new columns to your results table based on matching values in the join field.

The command requires two parameters:
- The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting))
- The name of the field to join on

* The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting))
* The field(s) to join on. Can be either:
* A single field name
* A comma-separated list of field names {applies_to}`stack: ga 9.2`

```esql
LOOKUP JOIN <lookup_index> ON <field_name>
LOOKUP JOIN <lookup_index> ON <field_name> # Join on a single field
LOOKUP JOIN <lookup_index> ON <field_name1>, <field_name2>, <field_name3> # Join on multiple fields
```

:::{image} ../images/esql-lookup-join.png
Expand Down Expand Up @@ -200,7 +203,7 @@ The following are the current limitations with `LOOKUP JOIN`:
* Indices in [`lookup` mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting) are always single-sharded.
* Cross cluster search is unsupported initially. Both source and lookup indices must be local.
* Currently, only matching on equality is supported.
* `LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported.
* In Stack versions `9.0-9.1`,`LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported.
* Aliases, datemath, and datastreams are supported, as long as the index pattern matches a single concrete index {applies_to}`stack: ga 9.1.0`.
* The name of the match field in `LOOKUP JOIN lu_idx ON match_field` must match an existing field in the query. This may require `RENAME`s or `EVAL`s to achieve.
* The query will circuit break if there are too many matching documents in the lookup index, or if the documents are too large. More precisely, `LOOKUP JOIN` works in batches of, normally, about 10,000 rows; a large amount of heap space is needed if the matching documents from the lookup index for a batch are multiple megabytes or larger. This is roughly the same as for `ENRICH`.
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.http.client.config.RequestConfig;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.tests.util.TimeUnits;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -697,13 +698,26 @@ private Map<String, Object> fetchMvLongs() throws IOException {
public void testLookupExplosion() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFields() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 990;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyMatchesManyFields() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
}

public void testLookupExplosionManyMatches() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000));
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
}

public void testLookupExplosionNoFetch() throws IOException {
Expand All @@ -730,11 +744,18 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
}

private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException {
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries);
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
StringBuilder query = startQuery();
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(location)\"}");
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON ");
for (int i = 0; i < joinFieldsCount; i++) {
if (i != 0) {
query.append(",");
}
query.append("id").append(i);
}
query.append(" | STATS COUNT(location)\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
Expand All @@ -744,24 +765,24 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri

private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries);
lookupExplosionData(sensorDataCount, lookupEntries, 1);
StringBuilder query = startQuery();
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}");
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
deleteIndex("sensor_lookup");
}
}

private void lookupExplosionData(int sensorDataCount, int lookupEntries) throws IOException {
initSensorData(sensorDataCount, 1);
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484");
private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount) throws IOException {
initSensorData(sensorDataCount, 1, joinFieldCount);
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount);
}

private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException {
try {
initSensorData(sensorDataCount, 1);
initSensorData(sensorDataCount, 1, 1);
initSensorLookupString(lookupEntries, 1, i -> {
int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes());
StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes()));
Expand All @@ -772,7 +793,7 @@ private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lo
return str.toString();
});
StringBuilder query = startQuery();
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(string)\"}");
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(string)\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
Expand All @@ -794,11 +815,11 @@ public void testEnrichExplosionManyMatches() throws IOException {

private Map<String, Object> enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
try {
initSensorData(sensorDataCount, 1);
initSensorData(sensorDataCount, 1, 1);
initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
try {
StringBuilder query = startQuery();
query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}");
query.append("FROM sensor_data | ENRICH sensor ON id0 | STATS COUNT(*)\"}");
return responseAsMap(query(query.toString(), null));
} finally {
Request delete = new Request("DELETE", "/_enrich/policy/sensor");
Expand Down Expand Up @@ -958,25 +979,42 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
initIndex("mv_longs", bulk.toString());
}

private void initSensorData(int docCount, int sensorCount) throws IOException {
private void initSensorData(int docCount, int sensorCount, int joinFieldCount) throws IOException {
logger.info("loading sensor data");
createIndex("sensor_data", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
{
"properties": {
"@timestamp": { "type": "date" },
"id": { "type": "long" },
// We cannot go over 1000 fields, due to failed on parsing mappings on index creation
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
assertTrue("Too many columns, it will throw an exception later", joinFieldCount <= 990);
StringBuilder createIndexBuilder = new StringBuilder();
createIndexBuilder.append("""
{
"properties": {
"@timestamp": { "type": "date" },
""");
for (int i = 0; i < joinFieldCount; i++) {
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"value": { "type": "double" }
}
}""");
CreateIndexResponse response = createIndex(
"sensor_data",
Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(),
createIndexBuilder.toString()
);
assertTrue(response.isAcknowledged());
int docsPerBulk = 1000;
long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z");

StringBuilder data = new StringBuilder();
for (int i = 0; i < docCount; i++) {
data.append(String.format(Locale.ROOT, """
{"create":{}}
{"timestamp":"%s", "id": %d, "value": %f}
""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate), i % sensorCount, i * 1.1));
{"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate)));
for (int j = 0; j < joinFieldCount; j++) {
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, i % sensorCount));
}
data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_data", data.toString());
data.setLength(0);
Expand All @@ -985,23 +1023,42 @@ private void initSensorData(int docCount, int sensorCount) throws IOException {
initIndex("sensor_data", data.toString());
}

private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location, int joinFieldsCount)
throws IOException {
logger.info("loading sensor lookup");
createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
// cannot go over 1000 fields, due to failed on parsing mappings on index creation
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
assertTrue("Too many join on fields, it will throw an exception later", joinFieldsCount <= 990);
StringBuilder createIndexBuilder = new StringBuilder();
createIndexBuilder.append("""
{
"properties": {
"id": { "type": "long" },
""");
for (int i = 0; i < joinFieldsCount; i++) {
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"location": { "type": "geo_point" }
}
}""");
CreateIndexResponse response = createIndex(
"sensor_lookup",
Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(),
createIndexBuilder.toString()
);
assertTrue(response.isAcknowledged());
int docsPerBulk = 1000;
StringBuilder data = new StringBuilder();
for (int i = 0; i < lookupEntries; i++) {
int sensor = i % sensorCount;
data.append(String.format(Locale.ROOT, """
{"create":{}}
{"id": %d, "location": "POINT(%s)"}
""", sensor, location.apply(sensor)));
{"""));
for (int j = 0; j < joinFieldsCount; j++) {
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
}
data.append(String.format(Locale.ROOT, """
"location": "POINT(%s)"}\n""", location.apply(sensor)));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_lookup", data.toString());
data.setLength(0);
Expand All @@ -1015,7 +1072,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
{
"properties": {
"id": { "type": "long" },
"id0": { "type": "long" },
"string": { "type": "text" }
}
}""");
Expand All @@ -1025,7 +1082,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
int sensor = i % sensorCount;
data.append(String.format(Locale.ROOT, """
{"create":{}}
{"id": %d, "string": "%s"}
{"id0": %d, "string": "%s"}
""", sensor, string.apply(sensor)));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_lookup", data.toString());
Expand All @@ -1036,15 +1093,15 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
}

private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
initSensorLookup(lookupEntries, sensorCount, location);
initSensorLookup(lookupEntries, sensorCount, location, 1);
logger.info("loading sensor enrich");

Request create = new Request("PUT", "/_enrich/policy/sensor");
create.setJsonEntity("""
{
"match": {
"indices": "sensor_lookup",
"match_field": "id",
"match_field": "id0",
"enrich_fields": ["location"]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected Writeable.Writer<T> instanceWriter() {
* Copy the {@link Writeable} by round tripping it through {@linkplain StreamInput} and {@linkplain StreamOutput}.
*/
@Override
protected final T copyInstance(T instance, TransportVersion version) throws IOException {
protected T copyInstance(T instance, TransportVersion version) throws IOException {
return copyInstance(instance, getNamedWriteableRegistry(), instanceWriter(), instanceReader(), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
public final class EnrichQuerySourceOperator extends SourceOperator {
private final BlockFactory blockFactory;
private final QueryList queryList;
private final LookupEnrichQueryGenerator queryList;
private int queryPosition = -1;
private final ShardContext shardContext;
private final IndexReader indexReader;
Expand All @@ -51,7 +51,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
public EnrichQuerySourceOperator(
BlockFactory blockFactory,
int maxPageSize,
QueryList queryList,
LookupEnrichQueryGenerator queryList,
ShardContext shardContext,
Warnings warnings
) {
Expand Down
Loading
Loading