diff --git a/docs/changelog/131559.yaml b/docs/changelog/131559.yaml new file mode 100644 index 0000000000000..837cc55a034e2 --- /dev/null +++ b/docs/changelog/131559.yaml @@ -0,0 +1,5 @@ +pr: 131559 +summary: Lookup Join on Multiple Columns POC WIP +area: ES|QL +type: enhancement +issues: [] diff --git a/docs/reference/query-languages/esql/esql-lookup-join.md b/docs/reference/query-languages/esql/esql-lookup-join.md index 826de488e5897..ce8d5404b8fba 100644 --- a/docs/reference/query-languages/esql/esql-lookup-join.md +++ b/docs/reference/query-languages/esql/esql-lookup-join.md @@ -202,5 +202,6 @@ The following are the current limitations with `LOOKUP JOIN`: * Currently, only matching on equality is supported. * `LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported. * Aliases, datemath, and datastreams are supported, as long as the index pattern matches a single concrete index {applies_to}`stack: ga 9.1.0`. + * Limitation on matching on a single field is removed. You can use a comma separated list of fields in the `ON` clause {applies_to}`stack: ga 9.2.0`. * The name of the match field in `LOOKUP JOIN lu_idx ON match_field` must match an existing field in the query. This may require `RENAME`s or `EVAL`s to achieve. -* The query will circuit break if there are too many matching documents in the lookup index, or if the documents are too large. More precisely, `LOOKUP JOIN` works in batches of, normally, about 10,000 rows; a large amount of heap space is needed if the matching documents from the lookup index for a batch are multiple megabytes or larger. This is roughly the same as for `ENRICH`. +* The query will circuit break if there are too many matching documents in the lookup index, or if the documents are too large. More precisely, `LOOKUP JOIN` works in batches of, normally, about 10,000 rows; a large amount of heap space is needed if the matching documents from the lookup index for a batch are multiple megabytes or larger. This is roughly the same as for `ENRICH`. \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index dad300ae72744..6d666a098500d 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -352,6 +352,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00); public static final TransportVersion NODE_WEIGHTS_ADDED_TO_NODE_BALANCE_STATS = def(9_129_0_00); public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00); + public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_131_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index 214e7197b2c84..ef56d81dab47d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -37,7 +37,7 @@ */ public final class EnrichQuerySourceOperator extends SourceOperator { private final BlockFactory blockFactory; - private final QueryList queryList; + private final LookupEnrichQueryGenerator queryList; private int queryPosition = -1; private final ShardContext shardContext; private final IndexReader indexReader; @@ -51,7 +51,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator { public EnrichQuerySourceOperator( BlockFactory blockFactory, int maxPageSize, - QueryList queryList, + LookupEnrichQueryGenerator queryList, ShardContext shardContext, Warnings warnings ) { @@ -159,7 +159,7 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil return page; } - private Query nextQuery() { + private Query nextQuery() throws IOException { ++queryPosition; while (isFinished() == false) { Query query = queryList.getQuery(queryPosition); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java new file mode 100644 index 0000000000000..b71e1373df859 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.search.Query; +import org.elasticsearch.core.Nullable; + +import java.io.IOException; + +/** + * An interface to generates queries for the lookup and enrich operators. + * This interface is used to retrieve queries based on a position index. + */ +public interface LookupEnrichQueryGenerator { + + /** + * Returns the query at the given position. + */ + @Nullable + Query getQuery(int position) throws IOException; + + /** + * Returns the number of queries in this generator + */ + int getPositionCount(); + +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 56b857ad90a82..4d93ab63aaa5d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -49,7 +49,7 @@ /** * Generates a list of Lucene queries based on the input block. */ -public abstract class QueryList { +public abstract class QueryList implements LookupEnrichQueryGenerator { protected final SearchExecutionContext searchExecutionContext; protected final AliasFilter aliasFilter; protected final MappedFieldType field; @@ -74,7 +74,8 @@ protected QueryList( /** * Returns the number of positions in this query list */ - int getPositionCount() { + @Override + public int getPositionCount() { return block.getPositionCount(); } @@ -87,7 +88,8 @@ int getPositionCount() { */ public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage); - final Query getQuery(int position) { + @Override + public final Query getQuery(int position) { final int valueCount = block.getValueCount(position); if (onlySingleValueParams != null && valueCount != 1) { if (valueCount > 1) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 53a223c1453e1..b9adde96795bd 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -63,6 +63,16 @@ public class CsvTestsDataLoader { private static final TestDataset HOSTS = new TestDataset("hosts"); private static final TestDataset APPS = new TestDataset("apps"); private static final TestDataset APPS_SHORT = APPS.withIndex("apps_short").withTypeMapping(Map.of("id", "short")); + private static final TestDataset MULTI_COLUMN_JOINABLE = new TestDataset( + "multi_column_joinable", + "mapping-multi_column_joinable.json", + "multi_column_joinable.csv" + ); + private static final TestDataset MULTI_COLUMN_JOINABLE_LOOKUP = new TestDataset( + "multi_column_joinable_lookup", + "mapping-multi_column_joinable2.json", + "multi_column_joinable2.csv" + ).withSetting("lookup-settings.json"); private static final TestDataset LANGUAGES = new TestDataset("languages"); private static final TestDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup").withSetting("lookup-settings.json"); private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key") @@ -213,7 +223,9 @@ public class CsvTestsDataLoader { Map.entry(LOGS.indexName, LOGS), Map.entry(MV_TEXT.indexName, MV_TEXT), Map.entry(DENSE_VECTOR.indexName, DENSE_VECTOR), - Map.entry(COLORS.indexName, COLORS) + Map.entry(COLORS.indexName, COLORS), + Map.entry(MULTI_COLUMN_JOINABLE.indexName, MULTI_COLUMN_JOINABLE), + Map.entry(MULTI_COLUMN_JOINABLE_LOOKUP.indexName, MULTI_COLUMN_JOINABLE_LOOKUP) ); private static final EnrichConfig LANGUAGES_ENRICH = new EnrichConfig("languages_policy", "enrich-policy-languages.json"); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv new file mode 100644 index 0000000000000..f5cf5341ab6f2 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv @@ -0,0 +1,16 @@ +id_int,name_str,is_active_bool,ip_addr,extra1,extra2 +1,Alice,true,192.168.1.1,foo,100 +2,Bob,false,192.168.1.2,bar,200 +3,Charlie,true,192.168.1.3,baz,300 +4,David,false,192.168.1.4,qux,400 +5,Eve,true,192.168.1.5,quux,500 +6,,true,192.168.1.6,corge,600 +7,Grace,false,,grault,700 +8,Hank,true,192.168.1.8,garply,800 +9,Ivy,false,192.168.1.9,waldo,900 +10,John,true,192.168.1.10,fred,1000 +,Kate,false,192.168.1.11,plugh,1100 +12,Liam,true,192.168.1.12,xyzzy,1200 +13,Mia,false,192.168.1.13,thud,1300 +14,Nina,true,192.168.1.14,foo2,1400 +15,Oscar,false,192.168.1.15,bar2,1500 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable2.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable2.csv new file mode 100644 index 0000000000000..198e9c2024e91 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable2.csv @@ -0,0 +1,17 @@ +id_int,name_str,is_active_bool,ip_addr,other1,other2 +1,Alice,true,192.168.1.1,alpha,1000 +1,Alice,true,192.168.1.2,beta,2000 +2,Bob,false,192.168.1.3,gamma,3000 +3,Charlie,true,192.168.1.3,delta,4000 +3,Charlie,false,192.168.1.3,epsilon,5000 +4,David,false,192.168.1.4,zeta,6000 +5,Eve,true,192.168.1.5,eta,7000 +5,Eve,true,192.168.1.5,theta,8000 +6,,true,192.168.1.6,iota,9000 +7,Grace,false,,kappa,10000 +8,Hank,true,192.168.1.8,lambda,11000 +,Kate,false,192.168.1.11,mu,12000 +12,Liam,true,192.168.1.12,nu,13000 +13,Mia,false,192.168.1.13,xi,14000 +14,Nina,true,192.168.1.14,omicron,15000 +16,Paul,true,192.168.1.16,pi,16000 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index c71bf34cafd1a..6be2e0a744154 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -4871,3 +4871,360 @@ Connected to 10.1.0.1 | English | English | U Connected to 10.1.0.1 | English | English | null Connected to 10.1.0.1 | English | null | United Kingdom ; + +lookupJoinOnTwoFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +3 | Charlie | baz | epsilon | 5000 +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | null | null +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +9 | Ivy | waldo | null | null +10 | John | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | Oscar | bar2 | null | null +null | Kate | plugh | null | null +; + +lookupJoinOnThreeFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | null | null +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +9 | Ivy | waldo | null | null +10 | John | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | Oscar | bar2 | null | null +null | Kate | plugh | null | null +; + + +lookupJoinOnFourFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool, ip_addr +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +2 | Bob | bar | null | null +3 | Charlie | baz | delta | 4000 +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | null | null +7 | Grace | grault | null | null +8 | Hank | garply | lambda | 11000 +9 | Ivy | waldo | null | null +10 | John | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | Oscar | bar2 | null | null +null | Kate | plugh | null | null +; + + +lookupJoinOnTwoOtherFields +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON is_active_bool, ip_addr +| KEEP id_int, name_str, extra1, other1, other2, is_active_bool, ip_addr +| SORT id_int, name_str, extra1, other1, other2, is_active_bool, ip_addr +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer | is_active_bool:boolean | ip_addr:ip +1 | Alice | foo | alpha | 1000 | true | 192.168.1.1 +3 | Charlie | baz | delta | 4000 | true | 192.168.1.3 +4 | David | qux | zeta | 6000 | false | 192.168.1.4 +5 | Eve | quux | eta | 7000 | true | 192.168.1.5 +5 | Eve | quux | theta | 8000 | true | 192.168.1.5 +6 | null | corge | iota | 9000 | true | 192.168.1.6 +8 | Hank | garply | lambda | 11000 | true | 192.168.1.8 +12 | Liam | xyzzy | nu | 13000 | true | 192.168.1.12 +13 | Mia | thud | xi | 14000 | false | 192.168.1.13 +14 | Nina | foo2 | omicron | 15000 | true | 192.168.1.14 +null | Kate | plugh | mu | 12000 | false | 192.168.1.11 +null | null | bar | null | null | false | 192.168.1.2 +null | null | bar2 | null | null | false | 192.168.1.15 +null | null | fred | null | null | true | 192.168.1.10 +null | null | grault | null | null | false | null +null | null | waldo | null | null | false | 192.168.1.9 +; + +lookupJoinOnTwoFieldsWithEval +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| eval id_int = id_int + 5 +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +6 | null | foo | iota | 9000 +7 | Grace | bar | kappa | 10000 +8 | Hank | baz | lambda | 11000 +9 | null | qux | null | null +10 | null | quux | null | null +11 | null | corge | null | null +12 | null | grault | null | null +13 | null | garply | null | null +14 | null | waldo | null | null +15 | null | fred | null | null +17 | null | xyzzy | null | null +18 | null | thud | null | null +19 | null | foo2 | null | null +20 | null | bar2 | null | null +null | null | plugh | null | null +; + + + + +lookupJoinOnTwoFieldsAfterTop +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| SORT extra1 +| LIMIT 10 +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +1 | Alice | foo | alpha | 1000 +1 | Alice | foo | beta | 2000 +2 | Bob | bar | gamma | 3000 +3 | Charlie | baz | delta | 4000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +10 | null | fred | null | null +14 | Nina | foo2 | omicron | 15000 +15 | null | bar2 | null | null +null | null | plugh | null | null +; + + +lookupJoinWithPushableFilterOnLeft +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithTwoPushableFiltersOnLeft +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| WHERE other1 like "*ta" +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +; + +lookupJoinWithMixLeftAndRightFilters +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithMixLeftAndRightFiltersNotPushableToLucene +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsNotPushable +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > id_int + 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsPushable + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE other2 > id_int + 5000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + + id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer + 4 | David | qux | zeta | 6000 + 5 | Eve | quux | eta | 7000 + 5 | Eve | quux | theta | 8000 + 6 | null | corge | iota | 9000 + 7 | Grace | grault | kappa | 10000 + 8 | Hank | garply | lambda | 11000 + 12 | Liam | xyzzy | nu | 13000 + 13 | Mia | thud | xi | 14000 + 14 | Nina | foo2 | omicron | 15000 + ; + + lookupJoinWithJoinAttrFilter + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE id_int > 7 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +8 | Hank | garply | lambda | 11000 +9 | null | waldo | null | null +10 | null | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | null | bar2 | null | null +; + + +lookupJoinWithExpressionOfOtherFields + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE ABS(other2) > LENGTH(other1)*1000 + 2000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + /* + NEED TO DEBUG WHY THE FILTER IS NOT PUSHED TO THE RIGHT SIDE OF THE LOOKUP JOIN + */ + ; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json new file mode 100644 index 0000000000000..f3f58f23e05d0 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json @@ -0,0 +1,22 @@ +{ + "properties": { + "id_int": { + "type": "integer" + }, + "name_str": { + "type": "keyword" + }, + "is_active_bool": { + "type": "boolean" + }, + "ip_addr": { + "type": "ip" + }, + "extra1": { + "type": "keyword" + }, + "extra2": { + "type": "integer" + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable2.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable2.json new file mode 100644 index 0000000000000..d11fc46301cb1 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable2.json @@ -0,0 +1,22 @@ +{ + "properties": { + "id_int": { + "type": "integer" + }, + "name_str": { + "type": "keyword" + }, + "is_active_bool": { + "type": "boolean" + }, + "ip_addr": { + "type": "ip" + }, + "other1": { + "type": "keyword" + }, + "other2": { + "type": "integer" + } + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index e25cb82f29851..f50723185e8b6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -224,18 +224,19 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws TEST_REQUEST_TIMEOUT ); final String finalNodeWithShard = nodeWithShard; + List matchFields = new ArrayList<>(); + matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName("key"), 1, keyType)); LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory( + matchFields, "test", parentTask, QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY), - 1, ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(), - keyType, "lookup", "lookup", - new FieldAttribute.FieldName("key"), List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))), - Source.EMPTY + Source.EMPTY, + null ); DriverContext driverContext = driverContext(); try ( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 5491ba58887f7..de7340bafb6e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1291,6 +1291,10 @@ public enum Cap { */ FIXED_PROFILE_SERIALIZATION, + /** + * Support for lookup join on multiple fields. + */ + LOOKUP_JOIN_ON_MULTIPLE_FIELDS, /** * Dot product vector similarity function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index dd305f09c12dc..2582db9300df1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -38,11 +38,13 @@ import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.FilterOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OutputOperator; import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.MergePositionsOperator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.AbstractRefCounted; @@ -72,10 +74,15 @@ import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.evaluator.EvalMapper; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.planner.Layout; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; @@ -191,7 +198,7 @@ public ThreadContext getThreadContext() { /** * Build a list of queries to perform inside the actual lookup. */ - protected abstract QueryList queryList( + protected abstract LookupEnrichQueryGenerator queryList( T request, SearchExecutionContext context, AliasFilter aliasFilter, @@ -271,17 +278,20 @@ protected void sendChildRequest( } private void doLookup(T request, CancellableTask task, ActionListener> listener) { - Block inputBlock = request.inputPage.getBlock(0); - if (inputBlock.areAllValuesNull()) { - List nullResponse = mergePages - ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)) - : List.of(); - listener.onResponse(nullResponse); - return; - } - final List releasables = new ArrayList<>(6); boolean started = false; + final List releasables = new ArrayList<>(6); try { + for (int j = 0; j < request.inputPage.getBlockCount(); j++) { + Block inputBlock = request.inputPage.getBlock(j); + if (inputBlock.areAllValuesNull()) { + List nullResponse = mergePages + ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)) + : List.of(); + listener.onResponse(nullResponse); + return; + } + } + var projectState = projectResolver.getProjectState(clusterService.state()); AliasFilter aliasFilter = indicesService.buildAliasFilter( projectState, @@ -305,6 +315,7 @@ private void doLookup(T request, CancellableTask task, ActionListener final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray(); final Operator finishPages; final OrdinalBytesRefBlock ordinalsBytesRefBlock; + Block inputBlock = request.inputPage.getBlock(0); if (mergePages // TODO fix this optimization for Lookup. && inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) { @@ -334,7 +345,7 @@ private void doLookup(T request, CancellableTask task, ActionListener request.source.source().getColumnNumber(), request.source.text() ); - QueryList queryList = queryList( + LookupEnrichQueryGenerator queryList = queryList( request, shardContext.executionContext, aliasFilter, @@ -350,13 +361,36 @@ private void doLookup(T request, CancellableTask task, ActionListener warnings ); releasables.add(queryOperator); - + Layout.Builder builder = new Layout.Builder(); + // append the docsIds and positions to the layout + builder.append( + // this looks wrong, what is the datatype for the Docs? It says DocVector but it is not a DataType + new FieldAttribute(Source.EMPTY, "Docs", new EsField("Docs", DataType.DOC_DATA_TYPE, Collections.emptyMap(), false)) + ); + builder.append( + new FieldAttribute(Source.EMPTY, "Positions", new EsField("Positions", DataType.INTEGER, Collections.emptyMap(), false)) + ); List operators = new ArrayList<>(); if (request.extractFields.isEmpty() == false) { var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); + builder.append(request.extractFields); releasables.add(extractFieldsOperator); operators.add(extractFieldsOperator); } + if (queryList instanceof PostJoinFilterable postJoinFilterable) { + FilterExec filterExec = postJoinFilterable.getPostJoinFilter(); + Operator inputOperator; + if (operators.isEmpty() == false) { + inputOperator = operators.getLast(); + } else { + inputOperator = queryOperator; + } + Operator postJoinFilter = filterExecOperator(filterExec, inputOperator, shardContext.context, driverContext, builder); + if (postJoinFilter != null) { + releasables.add(postJoinFilter); + operators.add(postJoinFilter); + } + } operators.add(finishPages); /* @@ -418,6 +452,27 @@ public void onFailure(Exception e) { } } + private Operator filterExecOperator( + FilterExec filterExec, + Operator inputOperator, // not needed? + EsPhysicalOperationProviders.ShardContext shardContext, + DriverContext driverContext, + Layout.Builder builder + ) { + if (filterExec == null) { + return null; + } + + var evaluatorFactory = EvalMapper.toEvaluator( + FoldContext.small()/*is this correct*/, + filterExec.condition(), + builder.build(), + List.of(shardContext) + ); + var filterOperatorFactory = new FilterOperator.FilterOperatorFactory(evaluatorFactory); + return filterOperatorFactory.get(driverContext); + } + private static Operator extractFieldsOperator( EsPhysicalOperationProviders.ShardContext shardContext, DriverContext driverContext, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 663698d77c6d4..da45f1e165bcd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; @@ -110,7 +111,7 @@ protected TransportRequest transportRequest(EnrichLookupService.Request request, } @Override - protected QueryList queryList( + protected LookupEnrichQueryGenerator queryList( TransportRequest request, SearchExecutionContext context, AliasFilter aliasFilter, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java new file mode 100644 index 0000000000000..e7bc9455a889d --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; +import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.xpack.esql.capabilities.TranslationAware; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.stats.SearchContextStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; + +/** + * A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query. + * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position. + * In the future we can extend this to support more complex expressions, such as disjunctions or negations. + */ +public class ExpressionQueryList implements LookupEnrichQueryGenerator, PostJoinFilterable { + private final List queryLists; + private final List preJoinFilters = new ArrayList<>(); + private FilterExec postJoinFilter; + private final SearchExecutionContext context; + + public ExpressionQueryList( + List queryLists, + SearchExecutionContext context, + PhysicalPlan rightPreJoinPlan, + ClusterService clusterService + ) { + if (queryLists.size() < 2 && rightPreJoinPlan == null) { + throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); + } + this.queryLists = queryLists; + this.context = context; + buildPrePostJoinFilter(rightPreJoinPlan, clusterService); + } + + private void buildPrePostJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) { + // we support a FilterExec as the pre-join filter + // if we filter Exec is not translatable to a QueryBuilder, we will apply it after the join + if (rightPreJoinPlan instanceof FilterExec filterExec) { + try { + LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from( + SearchContextStats.from(List.of(context)), + new EsqlFlags(clusterService.getClusterSettings()) + ); + // If the pre-join filter is a FilterExec, we can convert it to a QueryBuilder + // try to convert it to a QueryBuilder, if not possible apply it after the join + if (filterExec instanceof TranslationAware translationAware) { + preJoinFilters.add( + translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context) + ); + } else { + // if the filter is not translatable, we will apply it after the join + postJoinFilter = filterExec; + } + } catch (IOException e) { + throw new IllegalArgumentException("Failed to translate pre-join filter: " + filterExec, e); + } + + } else if (rightPreJoinPlan instanceof EsQueryExec esQueryExec) { + try { + // check the EsQueryExec for a pre-join filter + if (esQueryExec.query() != null) { + preJoinFilters.add(esQueryExec.query().toQuery(context)); + } + } catch (IOException e) { + throw new IllegalArgumentException("Failed to translate pre-join filter: " + esQueryExec, e); + } + } else { + throw new IllegalArgumentException("Unsupported pre-join filter type: " + preJoinFilters.getClass().getName()); + } + } + + @Override + public Query getQuery(int position) throws IOException { + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + for (QueryList queryList : queryLists) { + Query q = queryList.getQuery(position); + if (q == null) { + // if any of the matchFields are null, it means there is no match for this position + // A AND NULL is always NULL, so we can skip this position + return null; + } + builder.add(q, BooleanClause.Occur.FILTER); + } + // also attach the pre-join filter if it exists + for (Query preJoinFilter : preJoinFilters) { + builder.add(preJoinFilter, BooleanClause.Occur.FILTER); + } + return builder.build(); + } + + @Override + public int getPositionCount() { + int positionCount = queryLists.get(0).getPositionCount(); + for (QueryList queryList : queryLists) { + if (queryList.getPositionCount() != positionCount) { + throw new IllegalArgumentException( + "All QueryLists must have the same position count, expected: " + + positionCount + + ", but got: " + + queryList.getPositionCount() + ); + } + } + return positionCount; + } + + @Override + public FilterExec getPostJoinFilter() { + return postJoinFilter; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index 2b41edcbeff0b..a484d904ecd75 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.AsyncOperator; @@ -27,6 +28,8 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.Layout; import java.io.IOException; import java.util.Iterator; @@ -37,49 +40,69 @@ // TODO rename package public final class LookupFromIndexOperator extends AsyncOperator { + public record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) implements Writeable { + public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) { + // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup + // indices, so the call to exactAttribute looks redundant now. + this(match.exactAttribute().fieldName(), input.channel(), input.type()); + } + + public MatchConfig(StreamInput in) throws IOException { + this(new FieldAttribute.FieldName(in.readString()), in.readInt(), DataType.readFrom(in)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(fieldName.string()); + out.writeInt(channel); + type.writeTo(out); + } + + } + public record Factory( + List matchFields, String sessionId, CancellableTask parentTask, int maxOutstandingRequests, - int inputChannel, Function lookupService, - DataType inputDataType, String lookupIndexPattern, String lookupIndex, - FieldAttribute.FieldName matchField, List loadFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) implements OperatorFactory { @Override public String describe() { - return "LookupOperator[index=" - + lookupIndex - + " input_type=" - + inputDataType - + " match_field=" - + matchField.string() - + " load_fields=" - + loadFields - + " inputChannel=" - + inputChannel - + "]"; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields); + for (MatchConfig matchField : matchFields) { + stringBuilder.append(" input_type=") + .append(matchField.type) + .append(" match_field=") + .append(matchField.fieldName.string()) + .append(" inputChannel=") + .append(matchField.channel); + } + stringBuilder.append(" rightPreJoinPlan=").append(rightPreJoinPlan); + stringBuilder.append("]"); + return stringBuilder.toString(); } @Override public Operator get(DriverContext driverContext) { return new LookupFromIndexOperator( + matchFields, sessionId, driverContext, parentTask, maxOutstandingRequests, - inputChannel, lookupService.apply(driverContext), - inputDataType, lookupIndexPattern, lookupIndex, - matchField.string(), loadFields, - source + source, + rightPreJoinPlan ); } } @@ -87,14 +110,13 @@ public Operator get(DriverContext driverContext) { private final LookupFromIndexService lookupService; private final String sessionId; private final CancellableTask parentTask; - private final int inputChannel; - private final DataType inputDataType; private final String lookupIndexPattern; private final String lookupIndex; - private final String matchField; private final List loadFields; private final Source source; - private long totalTerms = 0L; + private long totalRows = 0L; + private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; /** * Total number of pages emitted by this {@link Operator}. */ @@ -105,45 +127,52 @@ public Operator get(DriverContext driverContext) { private OngoingJoin ongoing = null; public LookupFromIndexOperator( + List matchFields, String sessionId, DriverContext driverContext, CancellableTask parentTask, int maxOutstandingRequests, - int inputChannel, LookupFromIndexService lookupService, - DataType inputDataType, String lookupIndexPattern, String lookupIndex, - String matchField, List loadFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests); + this.matchFields = matchFields; this.sessionId = sessionId; this.parentTask = parentTask; - this.inputChannel = inputChannel; this.lookupService = lookupService; - this.inputDataType = inputDataType; this.lookupIndexPattern = lookupIndexPattern; this.lookupIndex = lookupIndex; - this.matchField = matchField; this.loadFields = loadFields; this.source = source; + this.rightPreJoinPlan = rightPreJoinPlan; } @Override protected void performAsync(Page inputPage, ActionListener listener) { - final Block inputBlock = inputPage.getBlock(inputChannel); - totalTerms += inputBlock.getTotalValueCount(); + Block[] inputBlockArray = new Block[matchFields.size()]; + for (int i = 0; i < matchFields.size(); i++) { + MatchConfig matchField = matchFields.get(i); + int inputChannel = matchField.channel; + final Block inputBlock = inputPage.getBlock(inputChannel); + if (i == 0) { + // we only add to the totalRows once, so we can use the first block + totalRows += inputBlock.getTotalValueCount(); + } + inputBlockArray[i] = inputBlock; + } LookupFromIndexService.Request request = new LookupFromIndexService.Request( sessionId, lookupIndex, lookupIndexPattern, - inputDataType, - matchField, - new Page(inputBlock), + matchFields, + new Page(inputBlockArray), loadFields, - source + source, + rightPreJoinPlan ); lookupService.lookupAsync( request, @@ -190,17 +219,19 @@ protected void releaseFetchedOnAnyThread(OngoingJoin ongoingJoin) { @Override public String toString() { - return "LookupOperator[index=" - + lookupIndex - + " input_type=" - + inputDataType - + " match_field=" - + matchField - + " load_fields=" - + loadFields - + " inputChannel=" - + inputChannel - + "]"; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields); + for (MatchConfig matchField : matchFields) { + stringBuilder.append(" input_type=") + .append(matchField.type) + .append(" match_field=") + .append(matchField.fieldName.string()) + .append(" inputChannel=") + .append(matchField.channel); + } + stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan); + stringBuilder.append("]"); + return stringBuilder.toString(); } @Override @@ -225,7 +256,7 @@ protected void doClose() { @Override protected Operator.Status status(long receivedPages, long completedPages, long processNanos) { - return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages); + return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalRows, emittedPages); } public static class Status extends AsyncOperator.Status { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 972a952a7b1fd..b3ef3e7fe0bcd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; @@ -31,15 +32,20 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * {@link LookupFromIndexService} performs lookup against a Lookup index for @@ -84,13 +90,14 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque request.inputPage, null, request.extractFields, - request.matchField, - request.source + request.matchFields, + request.source, + request.rightPreJoinPlan ); } @Override - protected QueryList queryList( + protected LookupEnrichQueryGenerator queryList( TransportRequest request, SearchExecutionContext context, AliasFilter aliasFilter, @@ -98,10 +105,32 @@ protected QueryList queryList( @Nullable DataType inputDataType, Warnings warnings ) { - return termQueryList(context.getFieldType(request.matchField), context, aliasFilter, inputBlock, inputDataType).onlySingleValues( - warnings, - "LOOKUP JOIN encountered multi-value" - ); + if (request.matchFields.size() == 1 + && (request.rightPreJoinPlan == null + || request.rightPreJoinPlan instanceof EsQueryExec esQueryExec && esQueryExec.query() == null)) { + // legacy case, we only have one match field and don't need ExpressionQueryList + return termQueryList( + context.getFieldType(request.matchFields.get(0).fieldName().string()), + context, + aliasFilter, + inputBlock, + inputDataType + ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); + } + // new case, we have multiple match fields or preJoinFilter and we need to create an ExpressionQueryList + List queryLists = new ArrayList<>(); + for (int i = 0; i < request.matchFields.size(); i++) { + LookupFromIndexOperator.MatchConfig matchField = request.matchFields.get(i); + QueryList q = termQueryList( + context.getFieldType(matchField.fieldName().string()), + context, + aliasFilter, + request.inputPage.getBlock(i), + matchField.type() + ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); + queryLists.add(q); + } + return new ExpressionQueryList(queryLists, context, request.rightPreJoinPlan, clusterService); } @Override @@ -115,25 +144,28 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in } public static class Request extends AbstractLookupService.Request { - private final String matchField; + private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; Request( String sessionId, String index, String indexPattern, - DataType inputDataType, - String matchField, + List matchFields, Page inputPage, List extractFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { - super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, source); - this.matchField = matchField; + super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source); + this.matchFields = matchFields; + this.rightPreJoinPlan = rightPreJoinPlan; } } protected static class TransportRequest extends AbstractLookupService.TransportRequest { - private final String matchField; + private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; TransportRequest( String sessionId, @@ -143,11 +175,13 @@ protected static class TransportRequest extends AbstractLookupService.TransportR Page inputPage, Page toRelease, List extractFields, - String matchField, - Source source + List matchFields, + Source source, + PhysicalPlan rightPreJoinPlan ) { super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source); - this.matchField = matchField; + this.matchFields = matchFields; + this.rightPreJoinPlan = rightPreJoinPlan; } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -181,6 +215,18 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro String sourceText = in.readString(); source = new Source(source.source(), sourceText); } + List matchFields = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { + matchFields = planIn.readCollectionAsList(LookupFromIndexOperator.MatchConfig::new); + } else { + // For older versions, we only support a single match field. + matchFields = new ArrayList<>(1); + matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType)); + } + PhysicalPlan rightPreJoinPlan = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { + rightPreJoinPlan = planIn.readOptionalNamedWriteable(PhysicalPlan.class); + } TransportRequest result = new TransportRequest( sessionId, shardId, @@ -189,8 +235,9 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro inputPage, inputPage, extractFields, - matchField, - source + matchFields, + source, + rightPreJoinPlan ); result.setParentTask(parentTaskId); return result; @@ -208,23 +255,35 @@ public void writeTo(StreamOutput out) throws IOException { } else if (indexPattern.equals(shardId.getIndexName()) == false) { throw new EsqlIllegalArgumentException("Aliases and index patterns are not allowed for LOOKUP JOIN [{}]", indexPattern); } - out.writeString(inputDataType.typeName()); out.writeWriteable(inputPage); PlanStreamOutput planOut = new PlanStreamOutput(out, null); planOut.writeNamedWriteableCollection(extractFields); - out.writeString(matchField); + out.writeString(matchFields.get(0).fieldName().string()); if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) { source.writeTo(planOut); } if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) { out.writeString(source.text()); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { + planOut.writeCollection(matchFields, (o, matchConfig) -> matchConfig.writeTo(o)); + } else if (matchFields.size() > 1) { + throw new EsqlIllegalArgumentException("LOOKUP JOIN on multiple fields is not supported on remote node"); + } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { + planOut.writeOptionalNamedWriteable(rightPreJoinPlan); + } else if (rightPreJoinPlan != null) { + throw new EsqlIllegalArgumentException("LOOKUP JOIN with pre-join filter is not supported on remote node"); + } } @Override protected String extraDescription() { - return " ,match_field=" + matchField; + return " ,match_fields=" + + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")) + + ", rightPreJoinPlan=" + + rightPreJoinPlan; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java new file mode 100644 index 0000000000000..8e9fbb088b8f6 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; + +/** + * An interface for a join operator that needs to have a filter applied after the join happens + * For now we use this for applying filters that are not translatable after a lookup join + */ +public interface PostJoinFilterable { + FilterExec getPostJoinFilter(); +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java index 6b4411fafbbfe..44618bb33a15c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java @@ -129,14 +129,45 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { // 2. filter scoped to the right // 3. filter that requires both sides to be evaluated ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); - // push the left scoped filter down to the left child, keep the rest intact + boolean optimizationApplied = false; + // push the left scoped filter down to the left child if (scoped.leftFilters.size() > 0) { // push the filter down to the left child left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters)); // update the join with the new left child join = (Join) join.replaceLeft(left); - - // keep the remaining filters in place, otherwise return the new join; + // we completely applied the left filters, so we can remove them from the scoped filters + scoped = new ScopedFilter(scoped.commonFilters(), List.of(), scoped.rightFilters); + optimizationApplied = true; + } + // push the right scoped filter down to the right child + if (scoped.rightFilters().isEmpty() == false) { + // push the filter down to the right child + right = new Filter(right.source(), right, Predicates.combineAnd(scoped.rightFilters())); + // update the join with the new right child + join = (Join) join.replaceRight(right); + // We still want to reapply the filters that we just applied to the right child, + // so we do NOT update scoped, and we do NOT mark optimizationApplied as true. + // This is because by pushing them on the right side, we filter what rows we get from the right side + // But we do not limit the output rows of the join as the rows are kept as not matched on the left side + // So we end up applying the right filters twice, once on the right side and once on top of the join + // This will result in major performance optimization when the lookup join is expanding + // and applying the right filters reduces the expansion significantly. + // For example, consider a lookup join where the right side is a 1Bln rows index with the value join value of 1. + // We have 10 rows on the left side with the value join value of 1. + // and there is a filter on the right side that filters out all rows on another column + // If we push the filter down to the right side, we will have 10 rows after the join (there were no matches) + // If we do not push the filter down to the right side, we will have 10 * 1Bln rows after the join (all rows matched) + // as the join is expanding. + // They would be filtered out in the next operator, but it is too late, as we already expanded the join + // In other cases, we might not get any performance benefit of this optimization, + // especially when the selectivity of the filter pushed down is very high or the join is not expanding. + + // In the future, once we have inner join support, it is usually possible to convert the lookup join into an inner join + // and then we don't need to reapply the filters on top of the join. + } + if (optimizationApplied) { + // if we pushed down some filters, we need to update the filters to reapply above the join Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters)); plan = remainingFilter != null ? filter.with(join, remainingFilter) : join; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 9e232bd8f02f9..afb6a17f21e0c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -636,7 +636,17 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { var matchFieldsCount = joinFields.size(); if (matchFieldsCount > 1) { - throw new ParsingException(source, "JOIN ON clause only supports one field at the moment, found [{}]", matchFieldsCount); + Set matchFieldNames = new LinkedHashSet<>(); + for (Attribute field : joinFields) { + if (matchFieldNames.add(field.name()) == false) { + throw new ParsingException( + field.source(), + "JOIN ON clause does not support multiple fields with the same name, found multiple instances of [{}]", + field.name() + ); + } + + } } return p -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 28204e2572842..6a006e845ddf8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -118,6 +118,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ShowExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec; import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext; @@ -731,8 +732,8 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan } Layout layout = layoutBuilder.build(); - EsQueryExec localSourceExec = (EsQueryExec) join.lookup(); - if (localSourceExec.indexMode() != IndexMode.LOOKUP) { + EsQueryExec localSourceExec = fildEsQueryExec(join.lookup()); + if (localSourceExec == null || localSourceExec.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } @@ -769,7 +770,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (join.leftFields().size() != join.rightFields().size()) { throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count"); } - List matchFields = new ArrayList<>(join.leftFields().size()); + List matchFields = new ArrayList<>(join.leftFields().size()); for (int i = 0; i < join.leftFields().size(); i++) { TypedAttribute left = (TypedAttribute) join.leftFields().get(i); FieldAttribute right = (FieldAttribute) join.rightFields().get(i); @@ -777,38 +778,33 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (input == null) { throw new IllegalArgumentException("can't plan [" + join + "][" + left + "]"); } - matchFields.add(new MatchConfig(right, input)); + matchFields.add(new LookupFromIndexOperator.MatchConfig(right, input)); } - if (matchFields.size() != 1) { - throw new IllegalArgumentException("can't plan [" + join + "]: multiple join predicates are not supported"); - } - // TODO support multiple match fields, and support more than equality predicates - MatchConfig matchConfig = matchFields.getFirst(); return source.with( new LookupFromIndexOperator.Factory( + matchFields, sessionId, parentTask, context.queryPragmas().enrichMaxWorkers(), - matchConfig.channel(), ctx -> lookupFromIndexService, - matchConfig.type(), localSourceExec.indexPattern(), indexName, - matchConfig.fieldName(), join.addedFields().stream().map(f -> (NamedExpression) f).toList(), - join.source() + join.source(), + join.right() ), layout ); } - private record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) { - private MatchConfig(FieldAttribute match, Layout.ChannelAndType input) { - // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup - // indices, so the call to exactAttribute looks redundant now. - this(match.exactAttribute().fieldName(), input.channel(), input.type()); + private EsQueryExec fildEsQueryExec(PhysicalPlan lookup) { + if (lookup instanceof EsQueryExec esQueryExec) { + return esQueryExec; + } else if (lookup instanceof UnaryExec unaryExec) { + return fildEsQueryExec(unaryExec.child()); } + return null; } private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index 29f2db102ea7e..8371b3d4d4dad 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; @@ -119,11 +120,25 @@ private PhysicalPlan mapBinary(BinaryPlan binary) { join.rightOutputFields() ); } - if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (right instanceof FilterExec filterExec) { + LookupJoinExec lookupJoinExec = getLookupJoinExec(join, filterExec.child(), left, config); + if (lookupJoinExec != null) { + // build the right child as a FilterExec with the original lookupJoinExec.right() as the child + FilterExec newRightChild = filterExec.replaceChild(lookupJoinExec.right()); + return lookupJoinExec.replaceChildren(lookupJoinExec.left(), newRightChild); + } } + LookupJoinExec lookupJoinExec = getLookupJoinExec(join, right, left, config); + if (lookupJoinExec != null) return lookupJoinExec; } return MapperUtils.unsupported(binary); } + + private static LookupJoinExec getLookupJoinExec(Join join, PhysicalPlan right, PhysicalPlan left, JoinConfig config) { + if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { + return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index bf6f0b89efbec..4cf722b025fa4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; @@ -234,10 +235,22 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { join.rightOutputFields() ); } - if (right instanceof FragmentExec fragment - && fragment.fragment() instanceof EsRelation relation - && relation.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (right instanceof FragmentExec fragment) { + boolean isIndexModeLookup = fragment.fragment() instanceof EsRelation relation && relation.indexMode() == IndexMode.LOOKUP; + isIndexModeLookup = isIndexModeLookup + || fragment.fragment() instanceof Filter filter + && filter.child() instanceof EsRelation relation + && relation.indexMode() == IndexMode.LOOKUP; + if (isIndexModeLookup) { + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields() + ); + } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java index 54071ac86d59f..01242b19e3074 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java @@ -135,18 +135,10 @@ public void testJoinOnConstant() { ); } - public void testJoinOnMultipleFields() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); - assertEquals( - "1:35: JOIN ON clause only supports one field at the moment, found [2]", - error("row languages = 1, gender = \"f\" | lookup join test on gender, languages") - ); - } - public void testJoinTwiceOnTheSameField() { assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); assertEquals( - "1:35: JOIN ON clause only supports one field at the moment, found [2]", + "1:66: JOIN ON clause does not support multiple fields with the same name, found multiple instances of [languages]", error("row languages = 1, gender = \"f\" | lookup join test on languages, languages") ); } @@ -154,7 +146,7 @@ public void testJoinTwiceOnTheSameField() { public void testJoinTwiceOnTheSameField_TwoLookups() { assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); assertEquals( - "1:80: JOIN ON clause only supports one field at the moment, found [2]", + "1:108: JOIN ON clause does not support multiple fields with the same name, found multiple instances of [gender]", error("row languages = 1, gender = \"f\" | lookup join test on languages | eval x = 1 | lookup join test on gender, gender") ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index 75b1a4db120e1..51c6e5b076043 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -142,18 +142,20 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { new ReferenceAttribute(Source.EMPTY, "lkwd", DataType.KEYWORD), new ReferenceAttribute(Source.EMPTY, "lint", DataType.INTEGER) ); + List matchFields = List.of( + new LookupFromIndexOperator.MatchConfig(matchField, inputChannel, inputDataType) + ); return new LookupFromIndexOperator.Factory( + matchFields, sessionId, parentTask, maxOutstandingRequests, - inputChannel, this::lookupService, - inputDataType, lookupIndex, lookupIndex, - matchField, loadFields, - Source.EMPTY + Source.EMPTY, + null ); } @@ -165,7 +167,7 @@ protected Matcher expectedDescriptionOfSimple() { @Override protected Matcher expectedToStringOfSimple() { return matchesPattern( - "LookupOperator\\[index=idx input_type=LONG match_field=match load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] inputChannel=0]" + "LookupOperator\\[index=idx load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] input_type=LONG match_field=match inputChannel=0]" ); } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 65d3a750e8a41..efefc0f8833a9 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -1,12 +1,12 @@ --- setup: - requires: - test_runner_features: [capabilities, contains, allowed_warnings] + test_runner_features: [ capabilities, contains, allowed_warnings ] capabilities: - method: POST path: /_query - parameters: [] - capabilities: [join_lookup_v12] + parameters: [ ] + capabilities: [ join_lookup_v12 ] reason: "uses LOOKUP JOIN" - do: indices.create: @@ -80,6 +80,31 @@ setup: type: long color: type: keyword + - do: + indices.create: + index: test-lookup-color-match + body: + settings: + index: + mode: lookup + mappings: + properties: + key: + type: long + color: + type: keyword + shade: + type: keyword + - do: + indices.create: + index: test2 + body: + mappings: + properties: + key: + type: long + color: + type: keyword - do: bulk: index: "test" @@ -88,7 +113,7 @@ setup: - { "index": { } } - { "key": 1, "color": "red" } - { "index": { } } - - { "key": 2, "color": "blue" } + - { "key": 2, "color": "yellow" } - do: bulk: index: "test-lookup-1" @@ -108,7 +133,7 @@ setup: - { "index": { } } - { "key": 2, "color": "blue" } - { "index": { } } - - { "key": [0, 1, 2], "color": null } + - { "key": [ 0, 1, 2 ], "color": null } - do: bulk: index: "test-lookup-mv" @@ -119,7 +144,7 @@ setup: - { "index": { } } - { "key": 2, "color": "yellow" } - { "index": { } } - - { "key": [0, 1, 2], "color": "green" } + - { "key": [ 0, 1, 2 ], "color": "green" } - do: bulk: index: "test-lookup-no-key" @@ -129,7 +154,28 @@ setup: - { "no-key": 1, "color": "cyan" } - { "index": { } } - { "no-key": 2, "color": "yellow" } - + - do: + bulk: + index: "test-lookup-color-match" + refresh: true + body: + - { "index": { } } + - { "key": 10, "color": "red" , shade: "light" } + - { "index": { } } + - { "key": 10, "color": "red" , shade: "reddish" } + - { "index": { } } + - { "key": 20, "color": "blue" , shade: "dark" } + - { "index": { } } + - { "key": 30, "color": "pink" , shade: "hot" } + - do: + bulk: + index: "test2" + refresh: true + body: + - { "index": { } } + - { "key": 10, "color": "red" } + - { "index": { } } + - { "key": 20, "color": "yellow" } --- basic: - do: @@ -137,12 +183,12 @@ basic: body: query: 'FROM test | SORT key | LOOKUP JOIN test-lookup-1 ON key | LIMIT 3' - - match: {columns.0.name: "key"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "color"} - - match: {columns.1.type: "keyword"} - - match: {values.0: [1, "cyan"]} - - match: {values.1: [2, "yellow"]} + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { values.0: [ 1, "cyan" ] } + - match: { values.1: [ 2, "yellow" ] } --- fails with non-lookup index v2: @@ -150,8 +196,8 @@ fails with non-lookup index v2: capabilities: - method: POST path: /_query - parameters: [] - capabilities: [enable_lookup_join_on_remote] + parameters: [ ] + capabilities: [ enable_lookup_join_on_remote ] reason: "checks updated error messages" - do: esql.query: @@ -195,12 +241,12 @@ mv-on-lookup: - "Line 1:24: evaluation of [LOOKUP JOIN test-lookup-mv ON key] failed, treating result as null. Only first 20 failures recorded." - "Line 1:24: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value" - - match: {columns.0.name: "key"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "color"} - - match: {columns.1.type: "keyword"} - - match: {values.0: [1, "cyan"]} - - match: {values.1: [2, "yellow"]} + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { values.0: [ 1, "cyan" ] } + - match: { values.1: [ 2, "yellow" ] } --- mv-on-query: @@ -212,21 +258,48 @@ mv-on-query: - "Line 1:27: evaluation of [LOOKUP JOIN test-lookup-1 ON key] failed, treating result as null. Only first 20 failures recorded." - "Line 1:27: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value" - - match: {columns.0.name: "key"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "color"} - - match: {columns.1.type: "keyword"} - - match: {values.0: [[0, 1, 2], null]} - - match: {values.1: [1, "cyan"]} - - match: {values.2: [2, "yellow"]} + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { values.0: [ [ 0, 1, 2 ], null ] } + - match: { values.1: [ 1, "cyan" ] } + - match: { values.2: [ 2, "yellow" ] } --- lookup-no-key: - do: esql.query: - body: - query: 'FROM test | LOOKUP JOIN test-lookup-no-key ON key | KEEP key, color' + body: + query: 'FROM test | LOOKUP JOIN test-lookup-no-key ON key | KEEP key, color' catch: "bad_request" - match: { error.type: "verification_exception" } - contains: { error.reason: "Unknown column [key] in right side of join" } + +--- +basic join on two fields: + - requires: + capabilities: + - method: POST + path: /_query + parameters: [ ] + capabilities: [ lookup_join_on_multiple_fields ] + reason: "uses LOOKUP JOIN on two fields" + - do: + esql.query: + body: + query: 'FROM test2 | LOOKUP JOIN test-lookup-color-match ON key, color | KEEP key, color, shade | SORT key, color, shade | LIMIT 10' + - length: { values: 3 } + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { columns.2.name: "shade" } + - match: { columns.2.type: "keyword" } + #for 10 and red 2 rows match + - match: { values.0: [ 10, "red", "light" ] } + - match: { values.1: [ 10, "red", "reddish" ] } + #for 20 and yellow, no rows match, but we keep the row as it is a lookup join + - match: { values.2: [ 20, "yellow", null ] } +