diff --git a/docs/changelog/131559.yaml b/docs/changelog/131559.yaml new file mode 100644 index 0000000000000..976781b5e4341 --- /dev/null +++ b/docs/changelog/131559.yaml @@ -0,0 +1,10 @@ +pr: 131559 +summary: Add support for LOOKUP JOIN on multiple fields +area: ES|QL +type: enhancement +issues: [ ] +highlight: + title: Add support for Lookup Join on Multiple Fields + body: "Add support for Lookup Join on Multiple Fields e.g. FROM index1\n| LOOKUP\ + \ JOIN lookup_index on field1, field2" + notable: true diff --git a/docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md b/docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md index 7b4f2d794ac22..95b0bbbae84de 100644 --- a/docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md +++ b/docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md @@ -17,13 +17,22 @@ FROM | LOOKUP JOIN ON ``` +```esql +FROM +| LOOKUP JOIN ON , , +``` + **Parameters** `` : The name of the lookup index. This must be a specific index name - wildcards, aliases, and remote cluster references are not supported. Indices used for lookups must be configured with the [`lookup` index mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting). -`` -: The field to join on. This field must exist in both your current query results and in the lookup index. If the field contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows). +`` or `, , ` +: The field(s) to join on. Can be either: + * A single field name + * A comma-separated list of field names {applies_to}`stack: ga 9.2` +: These fields must exist in both your current query results and in the lookup index. If the fields contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows). + **Description** @@ -32,7 +41,7 @@ results table by finding documents in a lookup index that share the same join field value as your result rows. For each row in your results table that matches a document in the lookup -index based on the join field, all fields from the matching document are +index based on the join fields, all fields from the matching document are added as new columns to that row. If multiple documents in the lookup index match a single row in your diff --git a/docs/reference/query-languages/esql/esql-lookup-join.md b/docs/reference/query-languages/esql/esql-lookup-join.md index 826de488e5897..d8df215576bac 100644 --- a/docs/reference/query-languages/esql/esql-lookup-join.md +++ b/docs/reference/query-languages/esql/esql-lookup-join.md @@ -33,11 +33,14 @@ For example, you can use `LOOKUP JOIN` to: The `LOOKUP JOIN` command adds fields from the lookup index as new columns to your results table based on matching values in the join field. The command requires two parameters: -- The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting)) -- The name of the field to join on - +* The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting)) +* The field(s) to join on. Can be either: + * A single field name + * A comma-separated list of field names {applies_to}`stack: ga 9.2` + ```esql -LOOKUP JOIN ON +LOOKUP JOIN ON # Join on a single field +LOOKUP JOIN ON , , # Join on multiple fields ``` :::{image} ../images/esql-lookup-join.png @@ -200,7 +203,7 @@ The following are the current limitations with `LOOKUP JOIN`: * Indices in [`lookup` mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting) are always single-sharded. * Cross cluster search is unsupported initially. Both source and lookup indices must be local. * Currently, only matching on equality is supported. -* `LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported. +* In Stack versions `9.0-9.1`,`LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported. * Aliases, datemath, and datastreams are supported, as long as the index pattern matches a single concrete index {applies_to}`stack: ga 9.1.0`. * The name of the match field in `LOOKUP JOIN lu_idx ON match_field` must match an existing field in the query. This may require `RENAME`s or `EVAL`s to achieve. * The query will circuit break if there are too many matching documents in the lookup index, or if the documents are too large. More precisely, `LOOKUP JOIN` works in batches of, normally, about 10,000 rows; a large amount of heap space is needed if the matching documents from the lookup index for a batch are multiple megabytes or larger. This is roughly the same as for `ENRICH`. diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f09633ac24629..baf301147c5fd 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -363,6 +363,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00); public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00); public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00); + public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 82419d03d4259..b463202221dc1 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -13,6 +13,7 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.util.EntityUtils; import org.apache.lucene.tests.util.TimeUnits; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -697,13 +698,26 @@ private Map fetchMvLongs() throws IOException { public void testLookupExplosion() throws IOException { int sensorDataCount = 400; int lookupEntries = 10000; - Map map = lookupExplosion(sensorDataCount, lookupEntries); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); } + public void testLookupExplosionManyFields() throws IOException { + int sensorDataCount = 400; + int lookupEntries = 1000; + int joinFieldsCount = 990; + Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionManyMatchesManyFields() throws IOException { + // 1500, 10000 is enough locally, but some CI machines need more. + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30)); + } + public void testLookupExplosionManyMatches() throws IOException { // 1500, 10000 is enough locally, but some CI machines need more. - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000)); + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1)); } public void testLookupExplosionNoFetch() throws IOException { @@ -730,11 +744,18 @@ public void testLookupExplosionBigStringManyMatches() throws IOException { assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1)); } - private Map lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException { + private Map lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException { try { - lookupExplosionData(sensorDataCount, lookupEntries); + lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount); StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(location)\"}"); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON "); + for (int i = 0; i < joinFieldsCount; i++) { + if (i != 0) { + query.append(","); + } + query.append("id").append(i); + } + query.append(" | STATS COUNT(location)\"}"); return responseAsMap(query(query.toString(), null)); } finally { deleteIndex("sensor_data"); @@ -744,9 +765,9 @@ private Map lookupExplosion(int sensorDataCount, int lookupEntri private Map lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException { try { - lookupExplosionData(sensorDataCount, lookupEntries); + lookupExplosionData(sensorDataCount, lookupEntries, 1); StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}"); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}"); return responseAsMap(query(query.toString(), null)); } finally { deleteIndex("sensor_data"); @@ -754,14 +775,14 @@ private Map lookupExplosionNoFetch(int sensorDataCount, int look } } - private void lookupExplosionData(int sensorDataCount, int lookupEntries) throws IOException { - initSensorData(sensorDataCount, 1); - initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484"); + private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount) throws IOException { + initSensorData(sensorDataCount, 1, joinFieldCount); + initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount); } private Map lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException { try { - initSensorData(sensorDataCount, 1); + initSensorData(sensorDataCount, 1, 1); initSensorLookupString(lookupEntries, 1, i -> { int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes())); @@ -772,7 +793,7 @@ private Map lookupExplosionBigString(int sensorDataCount, int lo return str.toString(); }); StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(string)\"}"); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(string)\"}"); return responseAsMap(query(query.toString(), null)); } finally { deleteIndex("sensor_data"); @@ -794,11 +815,11 @@ public void testEnrichExplosionManyMatches() throws IOException { private Map enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException { try { - initSensorData(sensorDataCount, 1); + initSensorData(sensorDataCount, 1, 1); initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484"); try { StringBuilder query = startQuery(); - query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}"); + query.append("FROM sensor_data | ENRICH sensor ON id0 | STATS COUNT(*)\"}"); return responseAsMap(query(query.toString(), null)); } finally { Request delete = new Request("DELETE", "/_enrich/policy/sensor"); @@ -958,16 +979,30 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx initIndex("mv_longs", bulk.toString()); } - private void initSensorData(int docCount, int sensorCount) throws IOException { + private void initSensorData(int docCount, int sensorCount, int joinFieldCount) throws IOException { logger.info("loading sensor data"); - createIndex("sensor_data", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """ - { - "properties": { - "@timestamp": { "type": "date" }, - "id": { "type": "long" }, + // We cannot go over 1000 fields, due to failed on parsing mappings on index creation + // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded + assertTrue("Too many columns, it will throw an exception later", joinFieldCount <= 990); + StringBuilder createIndexBuilder = new StringBuilder(); + createIndexBuilder.append(""" + { + "properties": { + "@timestamp": { "type": "date" }, + """); + for (int i = 0; i < joinFieldCount; i++) { + createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },"); + } + createIndexBuilder.append(""" "value": { "type": "double" } } }"""); + CreateIndexResponse response = createIndex( + "sensor_data", + Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), + createIndexBuilder.toString() + ); + assertTrue(response.isAcknowledged()); int docsPerBulk = 1000; long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z"); @@ -975,8 +1010,11 @@ private void initSensorData(int docCount, int sensorCount) throws IOException { for (int i = 0; i < docCount; i++) { data.append(String.format(Locale.ROOT, """ {"create":{}} - {"timestamp":"%s", "id": %d, "value": %f} - """, DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate), i % sensorCount, i * 1.1)); + {"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate))); + for (int j = 0; j < joinFieldCount; j++) { + data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, i % sensorCount)); + } + data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1)); if (i % docsPerBulk == docsPerBulk - 1) { bulk("sensor_data", data.toString()); data.setLength(0); @@ -985,23 +1023,42 @@ private void initSensorData(int docCount, int sensorCount) throws IOException { initIndex("sensor_data", data.toString()); } - private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction location) throws IOException { + private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction location, int joinFieldsCount) + throws IOException { logger.info("loading sensor lookup"); - createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """ + // cannot go over 1000 fields, due to failed on parsing mappings on index creation + // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded + assertTrue("Too many join on fields, it will throw an exception later", joinFieldsCount <= 990); + StringBuilder createIndexBuilder = new StringBuilder(); + createIndexBuilder.append(""" { "properties": { - "id": { "type": "long" }, + """); + for (int i = 0; i < joinFieldsCount; i++) { + createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },"); + } + createIndexBuilder.append(""" "location": { "type": "geo_point" } } }"""); + CreateIndexResponse response = createIndex( + "sensor_lookup", + Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), + createIndexBuilder.toString() + ); + assertTrue(response.isAcknowledged()); int docsPerBulk = 1000; StringBuilder data = new StringBuilder(); for (int i = 0; i < lookupEntries; i++) { int sensor = i % sensorCount; data.append(String.format(Locale.ROOT, """ {"create":{}} - {"id": %d, "location": "POINT(%s)"} - """, sensor, location.apply(sensor))); + {""")); + for (int j = 0; j < joinFieldsCount; j++) { + data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor)); + } + data.append(String.format(Locale.ROOT, """ + "location": "POINT(%s)"}\n""", location.apply(sensor))); if (i % docsPerBulk == docsPerBulk - 1) { bulk("sensor_lookup", data.toString()); data.setLength(0); @@ -1015,7 +1072,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """ { "properties": { - "id": { "type": "long" }, + "id0": { "type": "long" }, "string": { "type": "text" } } }"""); @@ -1025,7 +1082,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct int sensor = i % sensorCount; data.append(String.format(Locale.ROOT, """ {"create":{}} - {"id": %d, "string": "%s"} + {"id0": %d, "string": "%s"} """, sensor, string.apply(sensor))); if (i % docsPerBulk == docsPerBulk - 1) { bulk("sensor_lookup", data.toString()); @@ -1036,7 +1093,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct } private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction location) throws IOException { - initSensorLookup(lookupEntries, sensorCount, location); + initSensorLookup(lookupEntries, sensorCount, location, 1); logger.info("loading sensor enrich"); Request create = new Request("PUT", "/_enrich/policy/sensor"); @@ -1044,7 +1101,7 @@ private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction instanceWriter() { * Copy the {@link Writeable} by round tripping it through {@linkplain StreamInput} and {@linkplain StreamOutput}. */ @Override - protected final T copyInstance(T instance, TransportVersion version) throws IOException { + protected T copyInstance(T instance, TransportVersion version) throws IOException { return copyInstance(instance, getNamedWriteableRegistry(), instanceWriter(), instanceReader(), version); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index 214e7197b2c84..c4b1d098bb30e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -37,7 +37,7 @@ */ public final class EnrichQuerySourceOperator extends SourceOperator { private final BlockFactory blockFactory; - private final QueryList queryList; + private final LookupEnrichQueryGenerator queryList; private int queryPosition = -1; private final ShardContext shardContext; private final IndexReader indexReader; @@ -51,7 +51,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator { public EnrichQuerySourceOperator( BlockFactory blockFactory, int maxPageSize, - QueryList queryList, + LookupEnrichQueryGenerator queryList, ShardContext shardContext, Warnings warnings ) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java new file mode 100644 index 0000000000000..af5467de55936 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.Query; + +import java.util.List; + +/** + * A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query. + * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position. + * In the future we can extend this to support more complex expressions, such as disjunctions or negations. + */ +public class ExpressionQueryList implements LookupEnrichQueryGenerator { + private final List queryLists; + + public ExpressionQueryList(List queryLists) { + if (queryLists.size() < 2) { + throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); + } + this.queryLists = queryLists; + } + + @Override + public Query getQuery(int position) { + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + for (QueryList queryList : queryLists) { + Query q = queryList.getQuery(position); + if (q == null) { + // if any of the matchFields are null, it means there is no match for this position + // A AND NULL is always NULL, so we can skip this position + return null; + } + builder.add(q, BooleanClause.Occur.FILTER); + } + return builder.build(); + } + + @Override + public int getPositionCount() { + int positionCount = queryLists.get(0).getPositionCount(); + for (QueryList queryList : queryLists) { + if (queryList.getPositionCount() != positionCount) { + throw new IllegalStateException( + "All QueryLists must have the same position count, expected: " + + positionCount + + ", but got: " + + queryList.getPositionCount() + ); + } + } + return positionCount; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java new file mode 100644 index 0000000000000..cf581d9e83b43 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.search.Query; +import org.elasticsearch.core.Nullable; + +/** + * An interface to generates queries for the lookup and enrich operators. + * This interface is used to retrieve queries based on a position index. + */ +public interface LookupEnrichQueryGenerator { + + /** + * Returns the query at the given position. + */ + @Nullable + Query getQuery(int position); + + /** + * Returns the number of queries in this generator + */ + int getPositionCount(); + +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 56b857ad90a82..4d93ab63aaa5d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -49,7 +49,7 @@ /** * Generates a list of Lucene queries based on the input block. */ -public abstract class QueryList { +public abstract class QueryList implements LookupEnrichQueryGenerator { protected final SearchExecutionContext searchExecutionContext; protected final AliasFilter aliasFilter; protected final MappedFieldType field; @@ -74,7 +74,8 @@ protected QueryList( /** * Returns the number of positions in this query list */ - int getPositionCount() { + @Override + public int getPositionCount() { return block.getPositionCount(); } @@ -87,7 +88,8 @@ int getPositionCount() { */ public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage); - final Query getQuery(int position) { + @Override + public final Query getQuery(int position) { final int valueCount = block.getValueCount(position); if (onlySingleValueParams != null && valueCount != 1) { if (valueCount > 1) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java index d3b374a4d487f..ce5148b78379f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java index d0dcf39029d83..de7baef944243 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java @@ -15,7 +15,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongDoubleTupleBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java index 0c34e36b93e17..c3be35d6e3793 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java index 27d6d0177ef49..f01871e6a1d04 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunctionTests.java index b6223e36597d5..b831895d2fa6d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunctionTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests.java index fbd41d8ab06be..9d671e7566bf0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.ArrayList; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunctionTests.java index 82095553fdd58..10720471fa4c6 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunctionTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunctionTests.java index 74f6b20a9f9fb..150df32cf7928 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunctionTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.elasticsearch.search.aggregations.metrics.TDigestState; import org.junit.Before; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunctionTests.java index f39df0071aab2..1d57de4e14f5d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunctionTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesLongGroupingAggregatorFunctionTests.java index bb00541f24fe5..3247ece5af51f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesLongGroupingAggregatorFunctionTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.Arrays; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java index 189ccdb402f94..99b88135be4af 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.compute.operator.EvalOperator.EvalOperatorFactory; import org.elasticsearch.compute.test.CannedSourceOperator; import org.elasticsearch.compute.test.OperatorTestCase; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.hamcrest.Matcher; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java index fb1f7b5422308..32fc62dbac902 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.test.CannedSourceOperator; import org.elasticsearch.compute.test.OperatorTestCase; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.hamcrest.Matcher; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java index a4072754fae10..8bd272fd502ad 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.test.BlockTestUtils; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.hamcrest.Matcher; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java index 88b664533dbbb..05668d8beb910 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.test.OperatorTestCase; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.hamcrest.Matcher; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/RowInTableLookupOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/RowInTableLookupOperatorTests.java index 441d125c5608a..47f14dc1f899c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/RowInTableLookupOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/RowInTableLookupOperatorTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; import org.elasticsearch.compute.test.TestBlockFactory; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.hamcrest.Matcher; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleDocLongBlockSourceOperator.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleDocLongBlockSourceOperator.java index 26e84fe46d012..5a170285fe1db 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleDocLongBlockSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleDocLongBlockSourceOperator.java @@ -11,6 +11,7 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.DocBlock; +import org.elasticsearch.compute.test.TupleAbstractBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.mapper.BlockLoader; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 1180cdca64569..0d3ef89bfd03c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -30,15 +30,15 @@ import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.PageConsumerOperator; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.compute.operator.TupleAbstractBlockSourceOperator; import org.elasticsearch.compute.operator.TupleDocLongBlockSourceOperator; -import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator; import org.elasticsearch.compute.test.CannedSourceOperator; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; import org.elasticsearch.compute.test.TestBlockBuilder; import org.elasticsearch.compute.test.TestBlockFactory; import org.elasticsearch.compute.test.TestDriverFactory; +import org.elasticsearch.compute.test.TupleAbstractBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.Tuple; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleAbstractBlockSourceOperator.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleAbstractBlockSourceOperator.java similarity index 96% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleAbstractBlockSourceOperator.java rename to x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleAbstractBlockSourceOperator.java index 739c54e6e8eec..d29f6f0904237 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleAbstractBlockSourceOperator.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleAbstractBlockSourceOperator.java @@ -5,13 +5,12 @@ * 2.0. */ -package org.elasticsearch.compute.operator; +package org.elasticsearch.compute.test; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.test.AbstractBlockSourceOperator; import org.elasticsearch.core.Tuple; import java.util.List; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleLongLongBlockSourceOperator.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleLongLongBlockSourceOperator.java similarity index 97% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleLongLongBlockSourceOperator.java rename to x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleLongLongBlockSourceOperator.java index ae5045f04c9b9..a8c421ad710e5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleLongLongBlockSourceOperator.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleLongLongBlockSourceOperator.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.compute.operator; +package org.elasticsearch.compute.test; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 5c6d0186cdf77..b140f7ded44f0 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -113,6 +113,8 @@ public MultiClusterSpecIT( "SortEvalBeforeLookup", "SortBeforeAndAfterMultipleJoinAndMvExpand", "LookupJoinAfterTopNAndRemoteEnrich", + "LookupJoinOnTwoFieldsAfterTop", + "LookupJoinOnTwoFieldsMultipleTimes", // Lookup join after LIMIT is not supported in CCS yet "LookupJoinAfterLimitAndRemoteEnrich" ); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java index 6bedc96fd5a59..b1b82814e8716 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java @@ -226,13 +226,24 @@ private List availableIndices() throws IOException { .toList(); } - public record LookupIdx(String idxName, String key, String keyType) {} + public record LookupIdxColumn(String name, String type) {} + + public record LookupIdx(String idxName, List keys) {} private List lookupIndices() { List result = new ArrayList<>(); // we don't have key info from the dataset loader, let's hardcode it for now - result.add(new LookupIdx("languages_lookup", "language_code", "integer")); - result.add(new LookupIdx("message_types_lookup", "message", "keyword")); + result.add(new LookupIdx("languages_lookup", List.of(new LookupIdxColumn("language_code", "integer")))); + result.add(new LookupIdx("message_types_lookup", List.of(new LookupIdxColumn("message", "keyword")))); + List multiColumnJoinableLookupKeys = List.of( + new LookupIdxColumn("id_int", "integer"), + new LookupIdxColumn("name_str", "keyword"), + new LookupIdxColumn("is_active_bool", "boolean"), + new LookupIdxColumn("ip_addr", "ip"), + new LookupIdxColumn("other1", "keyword"), + new LookupIdxColumn("other2", "integer") + ); + result.add(new LookupIdx("multi_column_joinable_lookup", multiColumnJoinableLookupKeys)); return result; } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/LookupJoinGenerator.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/LookupJoinGenerator.java index 85dceef6a24cc..b1348491e73c3 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/LookupJoinGenerator.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/LookupJoinGenerator.java @@ -11,10 +11,15 @@ import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeRestTest; import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.test.ESTestCase.randomFrom; +import static org.elasticsearch.test.ESTestCase.randomInt; +import static org.elasticsearch.test.ESTestCase.randomSubsetOf; public class LookupJoinGenerator implements CommandGenerator { @@ -29,15 +34,47 @@ public CommandDescription generate( ) { GenerativeRestTest.LookupIdx lookupIdx = randomFrom(schema.lookupIndices()); String lookupIdxName = lookupIdx.idxName(); - String idxKey = lookupIdx.key(); - String keyType = lookupIdx.keyType(); + int joinColumnsCount = randomInt(lookupIdx.keys().size() - 1) + 1; // at least one column must be used for the join + List joinColumns = randomSubsetOf(joinColumnsCount, lookupIdx.keys()); + List keyNames = new ArrayList<>(); + List joinOn = new ArrayList<>(); + Set usedColumns = new HashSet<>(); + for (GenerativeRestTest.LookupIdxColumn joinColumn : joinColumns) { + String idxKey = joinColumn.name(); + String keyType = joinColumn.type(); - var candidateKeys = previousOutput.stream().filter(x -> x.type().equals(keyType)).toList(); - if (candidateKeys.isEmpty()) { + var candidateKeys = previousOutput.stream().filter(x -> x.type().equals(keyType)).toList(); + if (candidateKeys.isEmpty()) { + continue; // no candidate keys of the right type, skip this column + } + EsqlQueryGenerator.Column key = randomFrom(candidateKeys); + if (usedColumns.contains(key.name()) || usedColumns.contains(idxKey)) { + continue; // already used this column from the lookup index, or will discard the main index column by RENAME'ing below, skip + } else { + usedColumns.add(key.name()); + usedColumns.add(idxKey); + } + keyNames.add(key.name()); + joinOn.add(idxKey); + } + if (keyNames.isEmpty()) { return EMPTY_DESCRIPTION; } - EsqlQueryGenerator.Column key = randomFrom(candidateKeys); - String cmdString = "| rename " + key.name() + " as " + idxKey + " | lookup join " + lookupIdxName + " on " + idxKey; + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < keyNames.size(); i++) { + stringBuilder.append("| rename "); + stringBuilder.append(keyNames.get(i)); + stringBuilder.append(" as "); + stringBuilder.append(joinOn.get(i)); + } + stringBuilder.append(" | lookup join ").append(lookupIdxName).append(" on "); + for (int i = 0; i < keyNames.size(); i++) { + stringBuilder.append(joinOn.get(i)); + if (i < keyNames.size() - 1) { + stringBuilder.append(", "); + } + } + String cmdString = stringBuilder.toString(); return new CommandDescription(LOOKUP_JOIN, this, cmdString, Map.of()); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index c01141837fae3..c56ed4d489843 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -63,6 +63,16 @@ public class CsvTestsDataLoader { private static final TestDataset HOSTS = new TestDataset("hosts"); private static final TestDataset APPS = new TestDataset("apps"); private static final TestDataset APPS_SHORT = APPS.withIndex("apps_short").withTypeMapping(Map.of("id", "short")); + private static final TestDataset MULTI_COLUMN_JOINABLE = new TestDataset( + "multi_column_joinable", + "mapping-multi_column_joinable.json", + "multi_column_joinable.csv" + ); + private static final TestDataset MULTI_COLUMN_JOINABLE_LOOKUP = new TestDataset( + "multi_column_joinable_lookup", + "mapping-multi_column_joinable_lookup.json", + "multi_column_joinable_lookup.csv" + ).withSetting("lookup-settings.json"); private static final TestDataset LANGUAGES = new TestDataset("languages"); private static final TestDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup").withSetting("lookup-settings.json"); private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key") @@ -219,7 +229,9 @@ public class CsvTestsDataLoader { Map.entry(LOGS.indexName, LOGS), Map.entry(MV_TEXT.indexName, MV_TEXT), Map.entry(DENSE_VECTOR.indexName, DENSE_VECTOR), - Map.entry(COLORS.indexName, COLORS) + Map.entry(COLORS.indexName, COLORS), + Map.entry(MULTI_COLUMN_JOINABLE.indexName, MULTI_COLUMN_JOINABLE), + Map.entry(MULTI_COLUMN_JOINABLE_LOOKUP.indexName, MULTI_COLUMN_JOINABLE_LOOKUP) ); private static final EnrichConfig LANGUAGES_ENRICH = new EnrichConfig("languages_policy", "enrich-policy-languages.json"); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv new file mode 100644 index 0000000000000..9f205c4ac5822 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv @@ -0,0 +1,18 @@ +id_int,name_str,is_active_bool,ip_addr,extra1,extra2 +1,Alice,true,192.168.1.1,foo,100 +2,Bob,false,192.168.1.2,bar,200 +3,Charlie,true,192.168.1.3,baz,300 +4,David,false,192.168.1.4,qux,400 +5,Eve,true,192.168.1.5,quux,500 +6,,true,192.168.1.6,corge,600 +7,Grace,false,,grault,700 +8,Hank,true,192.168.1.8,garply,800 +9,Ivy,false,192.168.1.9,waldo,900 +10,John,true,192.168.1.10,fred,1000 +,Kate,false,192.168.1.11,plugh,1100 +[12],Liam,true,192.168.1.12,xyzzy,1200 +13,Mia,false,192.168.1.13,thud,1300 +[14],Nina,true,192.168.1.14,foo2,1400 +15,Oscar,false,192.168.1.15,bar2,1500 +[17,18],Olivia,true,192.168.1.17,xyz,17000 +[1,19,21],Sophia,true,192.168.1.21,zyx,21000 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable_lookup.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable_lookup.csv new file mode 100644 index 0000000000000..88dbbb891e641 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable_lookup.csv @@ -0,0 +1,19 @@ +id_int,name_str,is_active_bool,ip_addr,other1,other2 +1,Alice,true,192.168.1.1,alpha,1000 +1,Alice,true,192.168.1.2,beta,2000 +2,Bob,false,192.168.1.3,gamma,3000 +3,Charlie,true,192.168.1.3,delta,4000 +3,Charlie,false,192.168.1.3,epsilon,5000 +4,David,false,192.168.1.4,zeta,6000 +5,Eve,true,192.168.1.5,eta,7000 +5,Eve,true,192.168.1.5,theta,8000 +6,,true,192.168.1.6,iota,9000 +7,Grace,false,,kappa,10000 +8,Hank,true,192.168.1.8,lambda,11000 +,Kate,false,192.168.1.11,mu,12000 +12,Liam,true,192.168.1.12,nu,13000 +13,Mia,false,192.168.1.13,xi,14000 +[14],Nina,true,192.168.1.14,omicron,15000 +16,Paul,true,192.168.1.16,pi,16000 +[17,18],Olivia,true,192.168.1.17,rho,17000 +[1,19,20],Sophia,true,192.168.1.21,sigma,21000 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 77f678b28acc2..6d89963e8c612 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -4871,3 +4871,281 @@ Connected to 10.1.0.1 | English | English | U Connected to 10.1.0.1 | English | English | null Connected to 10.1.0.1 | English | null | United Kingdom ; + +lookupJoinOnTwoFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +[1, 19, 21] | Sophia | zyx | null | null +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +3 | Charlie | baz | epsilon | 5000 +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | null | null +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +9 | Ivy | waldo | null | null +10 | John | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | Oscar | bar2 | null | null +[17, 18] | Olivia | xyz | null | null +null | Kate | plugh | null | null +; + +lookupJoinOnTwoFieldsSelfJoin +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable_lookup +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str +| KEEP id_int, name_str, other1, other2 +| SORT id_int, name_str, other1, other2 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | other1:keyword | other2:integer +1 | Alice | alpha | 1000 +1 | Alice | alpha | 1000 +1 | Alice | beta | 2000 +1 | Alice | beta | 2000 +[1, 19, 20] | Sophia | null | null +2 | Bob | gamma | 3000 +3 | Charlie | delta | 4000 +3 | Charlie | delta | 4000 +3 | Charlie | epsilon | 5000 +3 | Charlie | epsilon | 5000 +4 | David | zeta | 6000 +5 | Eve | eta | 7000 +5 | Eve | eta | 7000 +5 | Eve | theta | 8000 +5 | Eve | theta | 8000 +6 | null | null | null +7 | Grace | kappa | 10000 +8 | Hank | lambda | 11000 +12 | Liam | nu | 13000 +13 | Mia | xi | 14000 +14 | Nina | omicron | 15000 +16 | Paul | pi | 16000 +[17, 18] | Olivia | null | null +null | Kate | null | null +; + +lookupJoinOnThreeFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +[1, 19, 21] | Sophia | zyx | null | null +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | null | null +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +9 | Ivy | waldo | null | null +10 | John | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | Oscar | bar2 | null | null +[17, 18] | Olivia | xyz | null | null +null | Kate | plugh | null | null +; + + +lookupJoinOnFourFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool, ip_addr +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool, ip_addr] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +[1, 19, 21] | Sophia | zyx | null | null +2 | Bob | bar | null | null +3 | Charlie | baz | delta | 4000 +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | null | null +7 | Grace | grault | null | null +8 | Hank | garply | lambda | 11000 +9 | Ivy | waldo | null | null +10 | John | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | Oscar | bar2 | null | null +[17, 18] | Olivia | xyz | null | null +null | Kate | plugh | null | null +; + + +lookupJoinOnTwoOtherFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON is_active_bool, ip_addr +| KEEP id_int, name_str, extra1, other1, other2, is_active_bool, ip_addr +| SORT id_int, name_str, extra1, other1, other2, is_active_bool, ip_addr +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer | is_active_bool:boolean | ip_addr:ip +1 | Alice | foo | alpha | 1000 | true | 192.168.1.1 +[1, 19, 20] | Sophia | zyx | sigma | 21000 | true | 192.168.1.21 +3 | Charlie | baz | delta | 4000 | true | 192.168.1.3 +4 | David | qux | zeta | 6000 | false | 192.168.1.4 +5 | Eve | quux | eta | 7000 | true | 192.168.1.5 +5 | Eve | quux | theta | 8000 | true | 192.168.1.5 +6 | null | corge | iota | 9000 | true | 192.168.1.6 +8 | Hank | garply | lambda | 11000 | true | 192.168.1.8 +12 | Liam | xyzzy | nu | 13000 | true | 192.168.1.12 +13 | Mia | thud | xi | 14000 | false | 192.168.1.13 +14 | Nina | foo2 | omicron | 15000 | true | 192.168.1.14 +[17, 18] | Olivia | xyz | rho | 17000 | true | 192.168.1.17 +null | Kate | plugh | mu | 12000 | false | 192.168.1.11 +null | null | bar | null | null | false | 192.168.1.2 +null | null | bar2 | null | null | false | 192.168.1.15 +null | null | fred | null | null | true | 192.168.1.10 +null | null | grault | null | null | false | null +null | null | waldo | null | null | false | 192.168.1.9 +; + +lookupJoinOnTwoFieldsWithEval +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| eval id_int = id_int + 5 +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +warning:Line 2:17: evaluation of [id_int + 5] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:17: java.lang.IllegalArgumentException: single-value function encountered multi-value +warning:Line 3:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +6 | null | foo | iota | 9000 +7 | Grace | bar | kappa | 10000 +8 | Hank | baz | lambda | 11000 +9 | null | qux | null | null +10 | null | quux | null | null +11 | null | corge | null | null +12 | null | grault | null | null +13 | null | garply | null | null +14 | null | waldo | null | null +15 | null | fred | null | null +17 | null | xyzzy | null | null +18 | null | thud | null | null +19 | null | foo2 | null | null +20 | null | bar2 | null | null +null | null | plugh | null | null +null | null | xyz | null | null +null | null | zyx | null | null +; + + + + +lookupJoinOnTwoFieldsAfterTop +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| SORT extra1 +| LIMIT 10 +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +warning:Line 4:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 4:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +10 | null | fred | null | null +14 | Nina | foo2 | omicron | 15000 +15 | null | bar2 | null | null +null | null | plugh | null | null +; + + +lookupJoinOnTwoFieldsMultipleTimes +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| SORT extra1 +| LIMIT 10 +| LOOKUP JOIN multi_column_joinable_lookup ON name_str, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +1 | Alice | foo | beta | 2000 +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +14 | Nina | foo2 | omicron | 15000 +null | null | bar2 | null | null +null | null | corge | null | null +null | null | fred | null | null +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json new file mode 100644 index 0000000000000..f3f58f23e05d0 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json @@ -0,0 +1,22 @@ +{ + "properties": { + "id_int": { + "type": "integer" + }, + "name_str": { + "type": "keyword" + }, + "is_active_bool": { + "type": "boolean" + }, + "ip_addr": { + "type": "ip" + }, + "extra1": { + "type": "keyword" + }, + "extra2": { + "type": "integer" + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable_lookup.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable_lookup.json new file mode 100644 index 0000000000000..d11fc46301cb1 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable_lookup.json @@ -0,0 +1,22 @@ +{ + "properties": { + "id_int": { + "type": "integer" + }, + "name_str": { + "type": "keyword" + }, + "is_active_bool": { + "type": "boolean" + }, + "ip_addr": { + "type": "ip" + }, + "other1": { + "type": "keyword" + }, + "other2": { + "type": "integer" + } + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index e25cb82f29851..dc5e815a85697 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -82,18 +83,69 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase { public void testKeywordKey() throws IOException { - runLookup(DataType.KEYWORD, new UsingSingleLookupTable(new String[] { "aa", "bb", "cc", "dd" })); + runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } })); + } + + public void testJoinOnTwoKeys() throws IOException { + runLookup( + List.of(DataType.KEYWORD, DataType.LONG), + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } }) + ); + } + + public void testJoinOnThreeKeys() throws IOException { + runLookup( + List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD), + new UsingSingleLookupTable( + new Object[][] { + new String[] { "aa", "bb", "cc", "dd" }, + new Long[] { 12L, 33L, 1L, 42L }, + new String[] { "one", "two", "three", "four" }, } + ) + ); + } + + public void testJoinOnFourKeys() throws IOException { + runLookup( + List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD, DataType.INTEGER), + new UsingSingleLookupTable( + new Object[][] { + new String[] { "aa", "bb", "cc", "dd" }, + new Long[] { 12L, 33L, 1L, 42L }, + new String[] { "one", "two", "three", "four" }, + new Integer[] { 1, 2, 3, 4 }, } + ) + ); } public void testLongKey() throws IOException { - runLookup(DataType.LONG, new UsingSingleLookupTable(new Long[] { 12L, 33L, 1L })); + runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } })); } /** * LOOKUP multiple results match. */ public void testLookupIndexMultiResults() throws IOException { - runLookup(DataType.KEYWORD, new UsingSingleLookupTable(new String[] { "aa", "bb", "bb", "dd" })); + runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } })); + } + + public void testJoinOnTwoKeysMultiResults() throws IOException { + runLookup( + List.of(DataType.KEYWORD, DataType.LONG), + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } }) + ); + } + + public void testJoinOnThreeKeysMultiResults() throws IOException { + runLookup( + List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD), + new UsingSingleLookupTable( + new Object[][] { + new String[] { "aa", "bb", "bb", "dd" }, + new Long[] { 12L, 1L, 1L, 42L }, + new String[] { "one", "two", "two", "four" } } + ) + ); } interface PopulateIndices { @@ -101,52 +153,87 @@ interface PopulateIndices { } class UsingSingleLookupTable implements PopulateIndices { - private final Map> matches = new HashMap<>(); - private final Object[] lookupData; + private final Map, List> matches = new HashMap<>(); + private final Object[][] lookupData; - UsingSingleLookupTable(Object[] lookupData) { + // Accepts array of arrays, each sub-array is values for a key field + // All subarrays must have the same length + UsingSingleLookupTable(Object[][] lookupData) { this.lookupData = lookupData; - for (int i = 0; i < lookupData.length; i++) { - matches.computeIfAbsent(lookupData[i], k -> new ArrayList<>()).add(i); + int numRows = lookupData[0].length; + for (int i = 0; i < numRows; i++) { + List key = new ArrayList<>(); + for (Object[] col : lookupData) { + key.add(col[i]); + } + matches.computeIfAbsent(key, k -> new ArrayList<>()).add(i); } } @Override public void populate(int docCount, List expected) { List docs = new ArrayList<>(); + int numFields = lookupData.length; + int numRows = lookupData[0].length; for (int i = 0; i < docCount; i++) { - Object key = lookupData[i % lookupData.length]; - docs.add(client().prepareIndex("source").setSource(Map.of("key", key))); + List key = new ArrayList<>(); + Map sourceDoc = new HashMap<>(); + for (int f = 0; f < numFields; f++) { + Object val = lookupData[f][i % numRows]; + key.add(val); + sourceDoc.put("key" + f, val); + } + docs.add(client().prepareIndex("source").setSource(sourceDoc)); + String keyString; + if (key.size() == 1) { + keyString = String.valueOf(key.get(0)); + } else { + keyString = String.join(",", key.stream().map(String::valueOf).toArray(String[]::new)); + } for (Integer match : matches.get(key)) { - expected.add(key + ":" + match); + expected.add(keyString + ":" + match); } } - for (int i = 0; i < lookupData.length; i++) { - docs.add(client().prepareIndex("lookup").setSource(Map.of("key", lookupData[i], "l", i))); + for (int i = 0; i < numRows; i++) { + Map lookupDoc = new HashMap<>(); + for (int f = 0; f < numFields; f++) { + lookupDoc.put("key" + f, lookupData[f][i]); + } + lookupDoc.put("l", i); + docs.add(client().prepareIndex("lookup").setSource(lookupDoc)); } Collections.sort(expected); indexRandom(true, true, docs); } } - private void runLookup(DataType keyType, PopulateIndices populateIndices) throws IOException { + private void runLookup(List keyTypes, PopulateIndices populateIndices) throws IOException { + String[] fieldMappers = new String[keyTypes.size() * 2]; + for (int i = 0; i < keyTypes.size(); i++) { + fieldMappers[2 * i] = "key" + i; + fieldMappers[2 * i + 1] = "type=" + keyTypes.get(i).esType(); + } client().admin() .indices() .prepareCreate("source") .setSettings(Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)) - .setMapping("key", "type=" + keyType.esType()) - .get(); - client().admin() - .indices() - .prepareCreate("lookup") - .setSettings( - Settings.builder() - .put(IndexSettings.MODE.getKey(), "lookup") - // TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command. - .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - ) - .setMapping("key", "type=" + keyType.esType(), "l", "type=long") + .setMapping(fieldMappers) .get(); + + Settings.Builder lookupSettings = Settings.builder() + .put(IndexSettings.MODE.getKey(), "lookup") + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1); + + String[] lookupMappers = new String[keyTypes.size() * 2 + 2]; + int lookupMappersCounter = 0; + for (; lookupMappersCounter < keyTypes.size(); lookupMappersCounter++) { + lookupMappers[2 * lookupMappersCounter] = "key" + lookupMappersCounter; + lookupMappers[2 * lookupMappersCounter + 1] = "type=" + keyTypes.get(lookupMappersCounter).esType(); + } + lookupMappers[2 * lookupMappersCounter] = "l"; + lookupMappers[2 * lookupMappersCounter + 1] = "type=long"; + client().admin().indices().prepareCreate("lookup").setSettings(lookupSettings).setMapping(lookupMappers).get(); + client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get(); int docCount = between(10, 1000); @@ -198,15 +285,20 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws DocIdSetIterator.NO_MORE_DOCS, false // no scoring ); - ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory( - PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY), - List.of( + List fieldInfos = new ArrayList<>(); + for (int i = 0; i < keyTypes.size(); i++) { + final int idx = i; + fieldInfos.add( new ValuesSourceReaderOperator.FieldInfo( - "key", - PlannerUtils.toElementType(keyType), - shard -> searchContext.getSearchExecutionContext().getFieldType("key").blockLoader(blContext()) + "key" + idx, + PlannerUtils.toElementType(keyTypes.get(idx)), + shard -> searchContext.getSearchExecutionContext().getFieldType("key" + idx).blockLoader(blContext()) ) - ), + ); + } + ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory( + PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY), + fieldInfos, List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> { throw new IllegalStateException("can't load source here"); }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), @@ -224,16 +316,18 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws TEST_REQUEST_TIMEOUT ); final String finalNodeWithShard = nodeWithShard; + List matchFields = new ArrayList<>(); + for (int i = 0; i < keyTypes.size(); i++) { + matchFields.add(new MatchConfig(new FieldAttribute.FieldName("key" + i), i + 1, keyTypes.get(i))); + } LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory( + matchFields, "test", parentTask, QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY), - 1, ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(), - keyType, "lookup", "lookup", - new FieldAttribute.FieldName("key"), List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))), Source.EMPTY ); @@ -245,16 +339,28 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws List.of(reader.get(driverContext), lookup.get(driverContext)), new PageConsumerOperator(page -> { try { - Block keyBlock = page.getBlock(1); - LongVector loadedBlock = page.getBlock(2).asVector(); + List keyBlocks = new ArrayList<>(); + for (int i = 0; i < keyTypes.size(); i++) { + keyBlocks.add(page.getBlock(i + 1)); + } + LongVector loadedBlock = page.getBlock(keyTypes.size() + 1).asVector(); for (int p = 0; p < page.getPositionCount(); p++) { - List key = BlockTestUtils.valuesAtPositions(keyBlock, p, p + 1).get(0); - assertThat(key, hasSize(1)); - Object keyValue = key.get(0); - if (keyValue instanceof BytesRef b) { - keyValue = b.utf8ToString(); + StringBuilder result = new StringBuilder(); + for (int j = 0; j < keyBlocks.size(); j++) { + List key = BlockTestUtils.valuesAtPositions(keyBlocks.get(j), p, p + 1).get(0); + assertThat(key, hasSize(1)); + Object keyValue = key.get(0); + if (keyValue instanceof BytesRef b) { + keyValue = b.utf8ToString(); + } + result.append(keyValue); + if (j < keyBlocks.size() - 1) { + result.append(","); + } + } - results.add(keyValue + ":" + loadedBlock.getLong(p)); + result.append(":" + loadedBlock.getLong(p)); + results.add(result.toString()); } } finally { page.releaseBlocks(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index a09a9177203c4..c7de35e94b464 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1317,6 +1317,10 @@ public enum Cap { */ FIXED_PROFILE_SERIALIZATION, + /** + * Support for lookup join on multiple fields. + */ + LOOKUP_JOIN_ON_MULTIPLE_FIELDS, /** * Dot product vector similarity function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index dd305f09c12dc..e33f0ad4b2904 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -43,6 +43,7 @@ import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.MergePositionsOperator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.AbstractRefCounted; @@ -191,12 +192,11 @@ public ThreadContext getThreadContext() { /** * Build a list of queries to perform inside the actual lookup. */ - protected abstract QueryList queryList( + protected abstract LookupEnrichQueryGenerator queryList( T request, SearchExecutionContext context, AliasFilter aliasFilter, Block inputBlock, - @Nullable DataType inputDataType, Warnings warnings ); @@ -271,13 +271,15 @@ protected void sendChildRequest( } private void doLookup(T request, CancellableTask task, ActionListener> listener) { - Block inputBlock = request.inputPage.getBlock(0); - if (inputBlock.areAllValuesNull()) { - List nullResponse = mergePages - ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)) - : List.of(); - listener.onResponse(nullResponse); - return; + for (int j = 0; j < request.inputPage.getBlockCount(); j++) { + Block inputBlock = request.inputPage.getBlock(j); + if (inputBlock.areAllValuesNull()) { + List nullResponse = mergePages + ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)) + : List.of(); + listener.onResponse(nullResponse); + return; + } } final List releasables = new ArrayList<>(6); boolean started = false; @@ -305,6 +307,7 @@ private void doLookup(T request, CancellableTask task, ActionListener final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray(); final Operator finishPages; final OrdinalBytesRefBlock ordinalsBytesRefBlock; + Block inputBlock = request.inputPage.getBlock(0); if (mergePages // TODO fix this optimization for Lookup. && inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) { @@ -334,14 +337,7 @@ private void doLookup(T request, CancellableTask task, ActionListener request.source.source().getColumnNumber(), request.source.text() ); - QueryList queryList = queryList( - request, - shardContext.executionContext, - aliasFilter, - inputBlock, - request.inputDataType, - warnings - ); + LookupEnrichQueryGenerator queryList = queryList(request, shardContext.executionContext, aliasFilter, inputBlock, warnings); var queryOperator = new EnrichQuerySourceOperator( driverContext.blockFactory(), EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE, @@ -536,11 +532,6 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen final String sessionId; final ShardId shardId; final String indexPattern; - /** - * For mixed clusters with nodes <8.14, this will be null. - */ - @Nullable - final DataType inputDataType; final Page inputPage; final List extractFields; final Source source; @@ -552,7 +543,6 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen String sessionId, ShardId shardId, String indexPattern, - DataType inputDataType, Page inputPage, Page toRelease, List extractFields, @@ -561,7 +551,6 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen this.sessionId = sessionId; this.shardId = shardId; this.indexPattern = indexPattern; - this.inputDataType = inputDataType; this.inputPage = inputPage; this.toRelease = toRelease; this.extractFields = extractFields; @@ -621,8 +610,6 @@ public final String toString() { + sessionId + " ,shard=" + shardId - + " ,input_type=" - + inputDataType + " ,extract_fields=" + extractFields + " ,positions=" diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 663698d77c6d4..894d5cc391e74 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; @@ -110,14 +111,14 @@ protected TransportRequest transportRequest(EnrichLookupService.Request request, } @Override - protected QueryList queryList( + protected LookupEnrichQueryGenerator queryList( TransportRequest request, SearchExecutionContext context, AliasFilter aliasFilter, Block inputBlock, - @Nullable DataType inputDataType, Warnings warnings ) { + DataType inputDataType = request.inputDataType; MappedFieldType fieldType = context.getFieldType(request.matchField); validateTypes(inputDataType, fieldType); return switch (request.matchType) { @@ -190,6 +191,11 @@ public static class Request extends AbstractLookupService.Request { protected static class TransportRequest extends AbstractLookupService.TransportRequest { private final String matchType; private final String matchField; + /** + * For mixed clusters with nodes <8.14, this will be null. + */ + @Nullable + final DataType inputDataType; TransportRequest( String sessionId, @@ -202,9 +208,10 @@ protected static class TransportRequest extends AbstractLookupService.TransportR List extractFields, Source source ) { - super(sessionId, shardId, shardId.getIndexName(), inputDataType, inputPage, toRelease, extractFields, source); + super(sessionId, shardId, shardId.getIndexName(), inputPage, toRelease, extractFields, source); this.matchType = matchType; this.matchField = matchField; + this.inputDataType = inputDataType; } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -261,7 +268,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override protected String extraDescription() { - return " ,match_type=" + matchType + " ,match_field=" + matchField; + return " ,input_type=" + inputDataType + " ,match_type=" + matchType + " ,match_field=" + matchField; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index 2b41edcbeff0b..b1313a3713b3b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -23,12 +23,11 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -37,47 +36,45 @@ // TODO rename package public final class LookupFromIndexOperator extends AsyncOperator { + public record Factory( + List matchFields, String sessionId, CancellableTask parentTask, int maxOutstandingRequests, - int inputChannel, Function lookupService, - DataType inputDataType, String lookupIndexPattern, String lookupIndex, - FieldAttribute.FieldName matchField, List loadFields, Source source ) implements OperatorFactory { @Override public String describe() { - return "LookupOperator[index=" - + lookupIndex - + " input_type=" - + inputDataType - + " match_field=" - + matchField.string() - + " load_fields=" - + loadFields - + " inputChannel=" - + inputChannel - + "]"; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields); + for (MatchConfig matchField : matchFields) { + stringBuilder.append(" input_type=") + .append(matchField.type()) + .append(" match_field=") + .append(matchField.fieldName().string()) + .append(" inputChannel=") + .append(matchField.channel()); + } + stringBuilder.append("]"); + return stringBuilder.toString(); } @Override public Operator get(DriverContext driverContext) { return new LookupFromIndexOperator( + matchFields, sessionId, driverContext, parentTask, maxOutstandingRequests, - inputChannel, lookupService.apply(driverContext), - inputDataType, lookupIndexPattern, lookupIndex, - matchField.string(), loadFields, source ); @@ -87,14 +84,12 @@ public Operator get(DriverContext driverContext) { private final LookupFromIndexService lookupService; private final String sessionId; private final CancellableTask parentTask; - private final int inputChannel; - private final DataType inputDataType; private final String lookupIndexPattern; private final String lookupIndex; - private final String matchField; private final List loadFields; private final Source source; - private long totalTerms = 0L; + private long totalRows = 0L; + private List matchFields; /** * Total number of pages emitted by this {@link Operator}. */ @@ -105,43 +100,51 @@ public Operator get(DriverContext driverContext) { private OngoingJoin ongoing = null; public LookupFromIndexOperator( + List matchFields, String sessionId, DriverContext driverContext, CancellableTask parentTask, int maxOutstandingRequests, - int inputChannel, LookupFromIndexService lookupService, - DataType inputDataType, String lookupIndexPattern, String lookupIndex, - String matchField, List loadFields, Source source ) { super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests); + this.matchFields = matchFields; this.sessionId = sessionId; this.parentTask = parentTask; - this.inputChannel = inputChannel; this.lookupService = lookupService; - this.inputDataType = inputDataType; this.lookupIndexPattern = lookupIndexPattern; this.lookupIndex = lookupIndex; - this.matchField = matchField; this.loadFields = loadFields; this.source = source; } @Override protected void performAsync(Page inputPage, ActionListener listener) { - final Block inputBlock = inputPage.getBlock(inputChannel); - totalTerms += inputBlock.getTotalValueCount(); + Block[] inputBlockArray = new Block[matchFields.size()]; + List newMatchFields = new ArrayList<>(); + for (int i = 0; i < matchFields.size(); i++) { + MatchConfig matchField = matchFields.get(i); + int inputChannel = matchField.channel(); + final Block inputBlock = inputPage.getBlock(inputChannel); + inputBlockArray[i] = inputBlock; + // the matchFields we have are indexed by the input channel on the left side of the join + // create a new MatchConfig that uses the field name and type from the matchField + // but the new channel index in the inputBlockArray + newMatchFields.add(new MatchConfig(matchField.fieldName(), i, matchField.type())); + } + // we only add to the totalRows once, so we can use the first block + totalRows += inputPage.getBlock(0).getTotalValueCount(); + LookupFromIndexService.Request request = new LookupFromIndexService.Request( sessionId, lookupIndex, lookupIndexPattern, - inputDataType, - matchField, - new Page(inputBlock), + newMatchFields, + new Page(inputBlockArray), loadFields, source ); @@ -190,17 +193,18 @@ protected void releaseFetchedOnAnyThread(OngoingJoin ongoingJoin) { @Override public String toString() { - return "LookupOperator[index=" - + lookupIndex - + " input_type=" - + inputDataType - + " match_field=" - + matchField - + " load_fields=" - + loadFields - + " inputChannel=" - + inputChannel - + "]"; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields); + for (MatchConfig matchField : matchFields) { + stringBuilder.append(" input_type=") + .append(matchField.type()) + .append(" match_field=") + .append(matchField.fieldName().string()) + .append(" inputChannel=") + .append(matchField.channel()); + } + stringBuilder.append("]"); + return stringBuilder.toString(); } @Override @@ -225,7 +229,7 @@ protected void doClose() { @Override protected Operator.Status status(long receivedPages, long completedPages, long processNanos) { - return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages); + return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalRows, emittedPages); } public static class Status extends AsyncOperator.Status { @@ -235,28 +239,28 @@ public static class Status extends AsyncOperator.Status { Status::new ); - private final long totalTerms; + private final long totalRows; /** * Total number of pages emitted by this {@link Operator}. */ private final long emittedPages; - Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms, long emittedPages) { + Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalRows, long emittedPages) { super(receivedPages, completedPages, totalTimeInMillis); - this.totalTerms = totalTerms; + this.totalRows = totalRows; this.emittedPages = emittedPages; } Status(StreamInput in) throws IOException { super(in); - this.totalTerms = in.readVLong(); + this.totalRows = in.readVLong(); this.emittedPages = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVLong(totalTerms); + out.writeVLong(totalRows); out.writeVLong(emittedPages); } @@ -269,8 +273,8 @@ public long emittedPages() { return emittedPages; } - public long totalTerms() { - return totalTerms; + public long totalRows() { + return totalRows; } @Override @@ -278,7 +282,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); super.innerToXContent(builder); builder.field("emitted_pages", emittedPages()); - builder.field("total_terms", totalTerms()); + builder.field("total_rows", totalRows()); return builder.endObject(); } @@ -291,12 +295,12 @@ public boolean equals(Object o) { return false; } Status status = (Status) o; - return totalTerms == status.totalTerms && emittedPages == status.emittedPages; + return totalRows == status.totalRows && emittedPages == status.emittedPages; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), totalTerms, emittedPages); + return Objects.hash(super.hashCode(), totalRows, emittedPages); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 972a952a7b1fd..3eab54a7e0efc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -20,8 +20,9 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.compute.operator.lookup.ExpressionQueryList; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; -import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; @@ -31,6 +32,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -38,8 +40,10 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * {@link LookupFromIndexService} performs lookup against a Lookup index for @@ -80,28 +84,38 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque request.sessionId, shardId, request.indexPattern, - request.inputDataType, request.inputPage, null, request.extractFields, - request.matchField, + request.matchFields, request.source ); } @Override - protected QueryList queryList( + protected LookupEnrichQueryGenerator queryList( TransportRequest request, SearchExecutionContext context, AliasFilter aliasFilter, Block inputBlock, - @Nullable DataType inputDataType, Warnings warnings ) { - return termQueryList(context.getFieldType(request.matchField), context, aliasFilter, inputBlock, inputDataType).onlySingleValues( - warnings, - "LOOKUP JOIN encountered multi-value" - ); + List queryLists = new ArrayList<>(); + for (int i = 0; i < request.matchFields.size(); i++) { + MatchConfig matchField = request.matchFields.get(i); + QueryList q = termQueryList( + context.getFieldType(matchField.fieldName().string()), + context, + aliasFilter, + request.inputPage.getBlock(matchField.channel()), + matchField.type() + ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); + queryLists.add(q); + } + if (queryLists.size() == 1) { + return queryLists.getFirst(); + } + return new ExpressionQueryList(queryLists); } @Override @@ -115,39 +129,39 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in } public static class Request extends AbstractLookupService.Request { - private final String matchField; + private final List matchFields; Request( String sessionId, String index, String indexPattern, - DataType inputDataType, - String matchField, + List matchFields, Page inputPage, List extractFields, Source source ) { - super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, source); - this.matchField = matchField; + super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source); + this.matchFields = matchFields; } } protected static class TransportRequest extends AbstractLookupService.TransportRequest { - private final String matchField; + private final List matchFields; + // Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order + // The channel information inside the MatchConfig, should say the same thing TransportRequest( String sessionId, ShardId shardId, String indexPattern, - DataType inputDataType, Page inputPage, Page toRelease, List extractFields, - String matchField, + List matchFields, Source source ) { - super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source); - this.matchField = matchField; + super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source); + this.matchFields = matchFields; } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -163,14 +177,26 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro indexPattern = shardId.getIndexName(); } - DataType inputDataType = DataType.fromTypeName(in.readString()); + DataType inputDataType = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS) == false) { + inputDataType = DataType.fromTypeName(in.readString()); + } + Page inputPage; try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) { inputPage = new Page(bsi); } PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null); List extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class); - String matchField = in.readString(); + List matchFields = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { + matchFields = planIn.readCollectionAsList(MatchConfig::new); + } else { + String matchField = in.readString(); + // For older versions, we only support a single match field. + matchFields = new ArrayList<>(1); + matchFields.add(new MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType)); + } var source = Source.EMPTY; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) { source = Source.readFrom(planIn); @@ -185,11 +211,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro sessionId, shardId, indexPattern, - inputDataType, inputPage, inputPage, extractFields, - matchField, + matchFields, source ); result.setParentTask(parentTaskId); @@ -208,12 +233,25 @@ public void writeTo(StreamOutput out) throws IOException { } else if (indexPattern.equals(shardId.getIndexName()) == false) { throw new EsqlIllegalArgumentException("Aliases and index patterns are not allowed for LOOKUP JOIN [{}]", indexPattern); } - - out.writeString(inputDataType.typeName()); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS) == false) { + // only write this for old versions + // older versions only support a single match field + if (matchFields.size() > 1) { + throw new EsqlIllegalArgumentException("LOOKUP JOIN on multiple fields is not supported on remote node"); + } + out.writeString(matchFields.get(0).type().typeName()); + } out.writeWriteable(inputPage); PlanStreamOutput planOut = new PlanStreamOutput(out, null); planOut.writeNamedWriteableCollection(extractFields); - out.writeString(matchField); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { + // serialize all match fields for new versions + planOut.writeCollection(matchFields, (o, matchConfig) -> matchConfig.writeTo(o)); + } else { + // older versions only support a single match field, we already checked this above when writing the datatype + // send the field name of the first and only match field here + out.writeString(matchFields.get(0).fieldName().string()); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) { source.writeTo(planOut); } @@ -224,7 +262,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override protected String extraDescription() { - return " ,match_field=" + matchField; + return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java new file mode 100644 index 0000000000000..616c6710eff48 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.planner.Layout; + +import java.io.IOException; +import java.util.Objects; + +public final class MatchConfig implements Writeable { + private final FieldAttribute.FieldName fieldName; + private final int channel; + private final DataType type; + + public MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) { + this.fieldName = fieldName; + this.channel = channel; + this.type = type; + } + + public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) { + // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup + // indices, so the call to exactAttribute looks redundant now. + this(match.exactAttribute().fieldName(), input.channel(), input.type()); + } + + public MatchConfig(StreamInput in) throws IOException { + this(new FieldAttribute.FieldName(in.readString()), in.readInt(), DataType.readFrom(in)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(fieldName.string()); + out.writeInt(channel); + type.writeTo(out); + } + + public FieldAttribute.FieldName fieldName() { + return fieldName; + } + + public int channel() { + return channel; + } + + public DataType type() { + return type; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (MatchConfig) obj; + return Objects.equals(this.fieldName, that.fieldName) && this.channel == that.channel && Objects.equals(this.type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(fieldName, channel, type); + } + + @Override + public String toString() { + return "MatchConfig[" + "fieldName=" + fieldName + ", " + "channel=" + channel + ", " + "type=" + type + ']'; + } + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index cc3528622203f..0b9b4d557cec6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -638,7 +638,17 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { var matchFieldsCount = joinFields.size(); if (matchFieldsCount > 1) { - throw new ParsingException(source, "JOIN ON clause only supports one field at the moment, found [{}]", matchFieldsCount); + Set matchFieldNames = new LinkedHashSet<>(); + for (Attribute field : joinFields) { + if (matchFieldNames.add(field.name()) == false) { + throw new ParsingException( + field.source(), + "JOIN ON clause does not support multiple fields with the same name, found multiple instances of [{}]", + field.name() + ); + } + + } } return p -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index fb7b6ccab5d5e..da48c4f1dbca3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -83,6 +83,7 @@ import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; import org.elasticsearch.xpack.esql.evaluator.EvalMapper; import org.elasticsearch.xpack.esql.evaluator.command.GrokEvaluatorExtracter; import org.elasticsearch.xpack.esql.expression.Order; @@ -779,23 +780,16 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan } matchFields.add(new MatchConfig(right, input)); } - if (matchFields.size() != 1) { - throw new IllegalArgumentException("can't plan [" + join + "]: multiple join predicates are not supported"); - } - // TODO support multiple match fields, and support more than equality predicates - MatchConfig matchConfig = matchFields.getFirst(); return source.with( new LookupFromIndexOperator.Factory( + matchFields, sessionId, parentTask, context.queryPragmas().enrichMaxWorkers(), - matchConfig.channel(), ctx -> lookupFromIndexService, - matchConfig.type(), localSourceExec.indexPattern(), indexName, - matchConfig.fieldName(), join.addedFields().stream().map(f -> (NamedExpression) f).toList(), join.source() ), @@ -803,14 +797,6 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan ); } - private record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) { - private MatchConfig(FieldAttribute match, Layout.ChannelAndType input) { - // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup - // indices, so the call to exactAttribute looks redundant now. - this(match.exactAttribute().fieldName(), input.channel(), input.type()); - } - } - private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) { Layout.Builder layout = new Layout.Builder(); layout.append(localSourceExec.output()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java index 54071ac86d59f..01242b19e3074 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java @@ -135,18 +135,10 @@ public void testJoinOnConstant() { ); } - public void testJoinOnMultipleFields() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); - assertEquals( - "1:35: JOIN ON clause only supports one field at the moment, found [2]", - error("row languages = 1, gender = \"f\" | lookup join test on gender, languages") - ); - } - public void testJoinTwiceOnTheSameField() { assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); assertEquals( - "1:35: JOIN ON clause only supports one field at the moment, found [2]", + "1:66: JOIN ON clause does not support multiple fields with the same name, found multiple instances of [languages]", error("row languages = 1, gender = \"f\" | lookup join test on languages, languages") ); } @@ -154,7 +146,7 @@ public void testJoinTwiceOnTheSameField() { public void testJoinTwiceOnTheSameField_TwoLookups() { assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); assertEquals( - "1:80: JOIN ON clause only supports one field at the moment, found [2]", + "1:108: JOIN ON clause does not support multiple fields with the same name, found multiple instances of [gender]", error("row languages = 1, gender = \"f\" | lookup join test on languages | eval x = 1 | lookup join test on gender, gender") ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java index 641eb72afb010..c65d240275089 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java @@ -39,7 +39,7 @@ protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator. long receivedPages = in.receivedPages(); long completedPages = in.completedPages(); long procesNanos = in.procesNanos(); - long totalTerms = in.totalTerms(); + long totalTerms = in.totalRows(); long emittedPages = in.emittedPages(); switch (randomIntBetween(0, 4)) { case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong); @@ -62,7 +62,7 @@ public void testToXContent() { "received_pages" : 100, "completed_pages" : 50, "emitted_pages" : 88, - "total_terms" : 120 + "total_rows" : 120 }""")); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index 75b1a4db120e1..c4a381eccff65 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.util.BytesRef; @@ -39,9 +40,11 @@ import org.elasticsearch.compute.test.NoOpReleasable; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; +import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; @@ -86,25 +89,37 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase { private final ThreadPool threadPool = threadPool(); private final Directory lookupIndexDirectory = newDirectory(); private final List releasables = new ArrayList<>(); + private int numberOfJoinColumns; // we only allow 1 or 2 columns due to simpleInput() implementation @Before public void buildLookupIndex() throws IOException { + numberOfJoinColumns = 1 + randomInt(1); // 1 or 2 join columns try (RandomIndexWriter writer = new RandomIndexWriter(random(), lookupIndexDirectory)) { for (int i = 0; i < LOOKUP_SIZE; i++) { - writer.addDocument( - List.of( - new LongField("match", i, Field.Store.NO), - new KeywordFieldMapper.KeywordField("lkwd", new BytesRef("l" + i), KeywordFieldMapper.Defaults.FIELD_TYPE), - new IntField("lint", -i, Field.Store.NO) - ) - ); + List fields = new ArrayList<>(); + fields.add(new LongField("match0", i, Field.Store.NO)); + if (numberOfJoinColumns == 2) { + fields.add(new LongField("match1", i + 1, Field.Store.NO)); + } + fields.add(new KeywordFieldMapper.KeywordField("lkwd", new BytesRef("l" + i), KeywordFieldMapper.Defaults.FIELD_TYPE)); + fields.add(new IntField("lint", -i, Field.Store.NO)); + writer.addDocument(fields); } } } @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { - return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> l % LOOKUP_SIZE)); + if (numberOfJoinColumns == 1) { + return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> l % LOOKUP_SIZE)); + } else if (numberOfJoinColumns == 2) { + return new TupleLongLongBlockSourceOperator( + blockFactory, + LongStream.range(0, size).mapToObj(l -> Tuple.tuple(l % LOOKUP_SIZE, l % LOOKUP_SIZE + 1)) + ); + } else { + throw new IllegalStateException("numberOfJoinColumns must be 1 or 2, got: " + numberOfJoinColumns); + } } @Override @@ -117,10 +132,10 @@ protected void assertSimpleOutput(List input, List results) { int count = results.stream().mapToInt(Page::getPositionCount).sum(); assertThat(count, equalTo(input.stream().mapToInt(Page::getPositionCount).sum())); for (Page r : results) { - assertThat(r.getBlockCount(), equalTo(3)); + assertThat(r.getBlockCount(), equalTo(numberOfJoinColumns + 2)); LongVector match = r.getBlock(0).asVector(); - BytesRefVector lkwd = r.getBlock(1).asVector(); - IntVector lint = r.getBlock(2).asVector(); + BytesRefVector lkwd = r.getBlock(numberOfJoinColumns).asVector(); + IntVector lint = r.getBlock(numberOfJoinColumns + 1).asVector(); for (int p = 0; p < r.getPositionCount(); p++) { long m = match.getLong(p); assertThat(lkwd.getBytesRef(p, new BytesRef()).utf8ToString(), equalTo("l" + m)); @@ -134,24 +149,26 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { String sessionId = "test"; CancellableTask parentTask = new CancellableTask(0, "test", "test", "test", TaskId.EMPTY_TASK_ID, Map.of()); int maxOutstandingRequests = 1; - int inputChannel = 0; DataType inputDataType = DataType.LONG; String lookupIndex = "idx"; - FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match"); List loadFields = List.of( new ReferenceAttribute(Source.EMPTY, "lkwd", DataType.KEYWORD), new ReferenceAttribute(Source.EMPTY, "lint", DataType.INTEGER) ); + + List matchFields = new ArrayList<>(); + for (int i = 0; i < numberOfJoinColumns; i++) { + FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match" + i); + matchFields.add(new MatchConfig(matchField, i, inputDataType)); + } return new LookupFromIndexOperator.Factory( + matchFields, sessionId, parentTask, maxOutstandingRequests, - inputChannel, this::lookupService, - inputDataType, lookupIndex, lookupIndex, - matchField, loadFields, Source.EMPTY ); @@ -164,9 +181,13 @@ protected Matcher expectedDescriptionOfSimple() { @Override protected Matcher expectedToStringOfSimple() { - return matchesPattern( - "LookupOperator\\[index=idx input_type=LONG match_field=match load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] inputChannel=0]" - ); + StringBuilder sb = new StringBuilder(); + sb.append("LookupOperator\\[index=idx load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+]"); + for (int i = 0; i < numberOfJoinColumns; i++) { + sb.append(" input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i); + } + sb.append("]"); + return matchesPattern(sb.toString()); } private LookupFromIndexService lookupService(DriverContext mainContext) { @@ -240,14 +261,25 @@ private AbstractLookupService.LookupShardContextFactory lookupShardContextFactor return shardId -> { MapperServiceTestCase mapperHelper = new MapperServiceTestCase() { }; - MapperService mapperService = mapperHelper.createMapperService(""" + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(""" { "doc": { "properties": { - "match": { "type": "long" }, + "match0": { "type": "long" }, + """); + if (numberOfJoinColumns == 2) { + stringBuilder.append(""" + "match1": { "type": "long" }, + """); + } + stringBuilder.append(""" "lkwd": { "type": "keyword" }, "lint": { "type": "integer" } }} - }"""); + } + """); + + MapperService mapperService = mapperHelper.createMapperService(stringBuilder.toString()); DirectoryReader reader = DirectoryReader.open(lookupIndexDirectory); SearchExecutionContext executionCtx = mapperHelper.createSearchExecutionContext(mapperService, newSearcher(reader)); var ctx = new EsPhysicalOperationProviders.DefaultShardContext(0, new NoOpReleasable(), executionCtx, AliasFilter.EMPTY); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MatchConfigSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MatchConfigSerializationTests.java new file mode 100644 index 0000000000000..2df9831b5bd0d --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MatchConfigSerializationTests.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.junit.Before; + +import java.io.IOException; + +import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration; + +public class MatchConfigSerializationTests extends AbstractWireSerializingTestCase { + + private Configuration config; + + @Before + public void initConfig() { + this.config = randomConfiguration(); + } + + @Override + protected Writeable.Reader instanceReader() { + return MatchConfig::new; + } + + @Override + protected MatchConfig createTestInstance() { + return randomMatchConfig(); + } + + private MatchConfig randomMatchConfig() { + // Implement logic to create a random MatchConfig instance + String name = randomAlphaOfLengthBetween(1, 100); + int channel = randomInt(); + DataType type = randomFrom(DataType.types()); + return new MatchConfig(new FieldAttribute.FieldName(name), channel, type); + } + + @Override + protected MatchConfig mutateInstance(MatchConfig instance) { + return mutateMatchConfig(instance); + } + + private MatchConfig mutateMatchConfig(MatchConfig instance) { + int i = randomIntBetween(1, 3); + return switch (i) { + case 1 -> { + String name = randomValueOtherThan(instance.fieldName().string(), () -> randomAlphaOfLengthBetween(1, 100)); + yield new MatchConfig(new FieldAttribute.FieldName(name), instance.channel(), instance.type()); + } + case 2 -> { + int channel = randomValueOtherThan(instance.channel(), () -> randomInt()); + yield new MatchConfig(instance.fieldName(), channel, instance.type()); + } + default -> { + DataType type = randomValueOtherThan(instance.type(), () -> randomFrom(DataType.types())); + yield new MatchConfig(instance.fieldName(), instance.channel(), type); + } + }; + } + + @Override + protected MatchConfig copyInstance(MatchConfig instance, TransportVersion version) throws IOException { + return copyInstance(instance, getNamedWriteableRegistry(), (out, v) -> v.writeTo(new PlanStreamOutput(out, config)), in -> { + PlanStreamInput pin = new PlanStreamInput(in, in.namedWriteableRegistry(), config); + return new MatchConfig(pin); + }, version); + } +} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 65d3a750e8a41..7c1606b90258c 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -1,12 +1,12 @@ --- setup: - requires: - test_runner_features: [capabilities, contains, allowed_warnings] + test_runner_features: [ capabilities, contains, allowed_warnings ] capabilities: - method: POST path: /_query - parameters: [] - capabilities: [join_lookup_v12] + parameters: [ ] + capabilities: [ join_lookup_v12 ] reason: "uses LOOKUP JOIN" - do: indices.create: @@ -80,6 +80,31 @@ setup: type: long color: type: keyword + - do: + indices.create: + index: test-lookup-color-match + body: + settings: + index: + mode: lookup + mappings: + properties: + key: + type: long + color: + type: keyword + shade: + type: keyword + - do: + indices.create: + index: test2 + body: + mappings: + properties: + key: + type: long + color: + type: keyword - do: bulk: index: "test" @@ -108,7 +133,7 @@ setup: - { "index": { } } - { "key": 2, "color": "blue" } - { "index": { } } - - { "key": [0, 1, 2], "color": null } + - { "key": [ 0, 1, 2 ], "color": null } - do: bulk: index: "test-lookup-mv" @@ -119,7 +144,7 @@ setup: - { "index": { } } - { "key": 2, "color": "yellow" } - { "index": { } } - - { "key": [0, 1, 2], "color": "green" } + - { "key": [ 0, 1, 2 ], "color": "green" } - do: bulk: index: "test-lookup-no-key" @@ -129,7 +154,28 @@ setup: - { "no-key": 1, "color": "cyan" } - { "index": { } } - { "no-key": 2, "color": "yellow" } - + - do: + bulk: + index: "test-lookup-color-match" + refresh: true + body: + - { "index": { } } + - { "key": 10, "color": "red" , shade: "light" } + - { "index": { } } + - { "key": 10, "color": "red" , shade: "reddish" } + - { "index": { } } + - { "key": 20, "color": "blue" , shade: "dark" } + - { "index": { } } + - { "key": 30, "color": "pink" , shade: "hot" } + - do: + bulk: + index: "test2" + refresh: true + body: + - { "index": { } } + - { "key": 10, "color": "red" } + - { "index": { } } + - { "key": 20, "color": "yellow" } --- basic: - do: @@ -137,12 +183,12 @@ basic: body: query: 'FROM test | SORT key | LOOKUP JOIN test-lookup-1 ON key | LIMIT 3' - - match: {columns.0.name: "key"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "color"} - - match: {columns.1.type: "keyword"} - - match: {values.0: [1, "cyan"]} - - match: {values.1: [2, "yellow"]} + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { values.0: [ 1, "cyan" ] } + - match: { values.1: [ 2, "yellow" ] } --- fails with non-lookup index v2: @@ -150,8 +196,8 @@ fails with non-lookup index v2: capabilities: - method: POST path: /_query - parameters: [] - capabilities: [enable_lookup_join_on_remote] + parameters: [ ] + capabilities: [ enable_lookup_join_on_remote ] reason: "checks updated error messages" - do: esql.query: @@ -195,12 +241,12 @@ mv-on-lookup: - "Line 1:24: evaluation of [LOOKUP JOIN test-lookup-mv ON key] failed, treating result as null. Only first 20 failures recorded." - "Line 1:24: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value" - - match: {columns.0.name: "key"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "color"} - - match: {columns.1.type: "keyword"} - - match: {values.0: [1, "cyan"]} - - match: {values.1: [2, "yellow"]} + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { values.0: [ 1, "cyan" ] } + - match: { values.1: [ 2, "yellow" ] } --- mv-on-query: @@ -212,21 +258,48 @@ mv-on-query: - "Line 1:27: evaluation of [LOOKUP JOIN test-lookup-1 ON key] failed, treating result as null. Only first 20 failures recorded." - "Line 1:27: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value" - - match: {columns.0.name: "key"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "color"} - - match: {columns.1.type: "keyword"} - - match: {values.0: [[0, 1, 2], null]} - - match: {values.1: [1, "cyan"]} - - match: {values.2: [2, "yellow"]} + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { values.0: [ [ 0, 1, 2 ], null ] } + - match: { values.1: [ 1, "cyan" ] } + - match: { values.2: [ 2, "yellow" ] } --- lookup-no-key: - do: esql.query: - body: - query: 'FROM test | LOOKUP JOIN test-lookup-no-key ON key | KEEP key, color' + body: + query: 'FROM test | LOOKUP JOIN test-lookup-no-key ON key | KEEP key, color' catch: "bad_request" - match: { error.type: "verification_exception" } - contains: { error.reason: "Unknown column [key] in right side of join" } + +--- +basic join on two fields: + - requires: + capabilities: + - method: POST + path: /_query + parameters: [ ] + capabilities: [ lookup_join_on_multiple_fields ] + reason: "uses LOOKUP JOIN on two fields" + - do: + esql.query: + body: + query: 'FROM test2 | LOOKUP JOIN test-lookup-color-match ON key, color | KEEP key, color, shade | SORT key, color, shade | LIMIT 10' + - length: { values: 3 } + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { columns.2.name: "shade" } + - match: { columns.2.type: "keyword" } + #for 10 and red 2 rows match + - match: { values.0: [ 10, "red", "light" ] } + - match: { values.1: [ 10, "red", "reddish" ] } + #for 20 and yellow, no rows match, but we keep the row as it is a lookup join + - match: { values.2: [ 20, "yellow", null ] } +