diff --git a/docs/changelog/132889.yaml b/docs/changelog/132889.yaml new file mode 100644 index 0000000000000..4d59be3c6cbb5 --- /dev/null +++ b/docs/changelog/132889.yaml @@ -0,0 +1,6 @@ +pr: 132889 +summary: Improve Expanding Lookup Join performance by pushing a filter to the lookup + join +area: "ES|QL" +type: enhancement +issues: [ ] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f5283510bd1c9..3379bc40843d5 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -365,6 +365,7 @@ static TransportVersion def(int id) { public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00); public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00); public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00); + public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_141_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index c4b1d098bb30e..ef56d81dab47d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -159,7 +159,7 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil return page; } - private Query nextQuery() { + private Query nextQuery() throws IOException { ++queryPosition; while (isFinished() == false) { Query query = queryList.getQuery(queryPosition); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java deleted file mode 100644 index af5467de55936..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.operator.lookup; - -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.Query; - -import java.util.List; - -/** - * A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query. - * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position. - * In the future we can extend this to support more complex expressions, such as disjunctions or negations. - */ -public class ExpressionQueryList implements LookupEnrichQueryGenerator { - private final List queryLists; - - public ExpressionQueryList(List queryLists) { - if (queryLists.size() < 2) { - throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); - } - this.queryLists = queryLists; - } - - @Override - public Query getQuery(int position) { - BooleanQuery.Builder builder = new BooleanQuery.Builder(); - for (QueryList queryList : queryLists) { - Query q = queryList.getQuery(position); - if (q == null) { - // if any of the matchFields are null, it means there is no match for this position - // A AND NULL is always NULL, so we can skip this position - return null; - } - builder.add(q, BooleanClause.Occur.FILTER); - } - return builder.build(); - } - - @Override - public int getPositionCount() { - int positionCount = queryLists.get(0).getPositionCount(); - for (QueryList queryList : queryLists) { - if (queryList.getPositionCount() != positionCount) { - throw new IllegalStateException( - "All QueryLists must have the same position count, expected: " - + positionCount - + ", but got: " - + queryList.getPositionCount() - ); - } - } - return positionCount; - } -} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java index cf581d9e83b43..b71e1373df859 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.Query; import org.elasticsearch.core.Nullable; +import java.io.IOException; + /** * An interface to generates queries for the lookup and enrich operators. * This interface is used to retrieve queries based on a position index. @@ -20,7 +22,7 @@ public interface LookupEnrichQueryGenerator { * Returns the query at the given position. */ @Nullable - Query getQuery(int position); + Query getQuery(int position) throws IOException; /** * Returns the number of queries in this generator diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 8694505411291..74360c362e235 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -5149,3 +5149,200 @@ null | null | bar2 | null | null null | null | corge | null | null null | null | fred | null | null ; + + +lookupJoinWithPushableFilterOnLeft +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithTwoPushableFiltersOnLeft +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| WHERE other1 like "*ta" +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +; + +lookupJoinWithMixLeftAndRightFilters +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithMixLeftAndRightFiltersNotPushableToLucene +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsNotPushable +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > id_int + 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:23: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:23: java.lang.IllegalArgumentException: single-value function encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsPushable + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE other2 > id_int + 5000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:18: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:18: java.lang.IllegalArgumentException: single-value function encountered multi-value + + id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer + 4 | David | qux | zeta | 6000 + 5 | Eve | quux | eta | 7000 + 5 | Eve | quux | theta | 8000 + 6 | null | corge | iota | 9000 + 7 | Grace | grault | kappa | 10000 + 8 | Hank | garply | lambda | 11000 + 12 | Liam | xyzzy | nu | 13000 + 13 | Mia | thud | xi | 14000 + 14 | Nina | foo2 | omicron | 15000 + ; + + lookupJoinWithJoinAttrFilter + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE id_int > 7 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 3:9: evaluation of [id_int > 7] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:9: java.lang.IllegalArgumentException: single-value function encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +8 | Hank | garply | lambda | 11000 +9 | null | waldo | null | null +10 | null | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | null | bar2 | null | null +; + + +lookupJoinWithExpressionOfOtherFields + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE ABS(other2) > LENGTH(other1)*1000 + 2000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index dc5e815a85697..d95060604d7f7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -329,7 +329,8 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) "lookup", "lookup", List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))), - Source.EMPTY + Source.EMPTY, + null ); DriverContext driverContext = driverContext(); try ( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index e33f0ad4b2904..7892cd17c2947 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -38,6 +38,7 @@ import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.FilterOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OutputOperator; import org.elasticsearch.compute.operator.ProjectOperator; @@ -73,10 +74,15 @@ import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.evaluator.EvalMapper; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.planner.Layout; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; @@ -346,13 +352,36 @@ private void doLookup(T request, CancellableTask task, ActionListener warnings ); releasables.add(queryOperator); - + Layout.Builder builder = new Layout.Builder(); + // append the docsIds and positions to the layout + builder.append( + // this looks wrong, what is the datatype for the Docs? It says DocVector but it is not a DataType + new FieldAttribute(Source.EMPTY, "Docs", new EsField("Docs", DataType.DOC_DATA_TYPE, Collections.emptyMap(), false)) + ); + builder.append( + new FieldAttribute(Source.EMPTY, "Positions", new EsField("Positions", DataType.INTEGER, Collections.emptyMap(), false)) + ); List operators = new ArrayList<>(); if (request.extractFields.isEmpty() == false) { var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); + builder.append(request.extractFields); releasables.add(extractFieldsOperator); operators.add(extractFieldsOperator); } + if (queryList instanceof PostJoinFilterable postJoinFilterable) { + FilterExec filterExec = postJoinFilterable.getPostJoinFilter(); + Operator inputOperator; + if (operators.isEmpty() == false) { + inputOperator = operators.getLast(); + } else { + inputOperator = queryOperator; + } + Operator postJoinFilter = filterExecOperator(filterExec, inputOperator, shardContext.context, driverContext, builder); + if (postJoinFilter != null) { + releasables.add(postJoinFilter); + operators.add(postJoinFilter); + } + } operators.add(finishPages); /* @@ -414,6 +443,27 @@ public void onFailure(Exception e) { } } + private Operator filterExecOperator( + FilterExec filterExec, + Operator inputOperator, // not needed? + EsPhysicalOperationProviders.ShardContext shardContext, + DriverContext driverContext, + Layout.Builder builder + ) { + if (filterExec == null) { + return null; + } + + var evaluatorFactory = EvalMapper.toEvaluator( + FoldContext.small()/*is this correct*/, + filterExec.condition(), + builder.build(), + List.of(shardContext) + ); + var filterOperatorFactory = new FilterOperator.FilterOperatorFactory(evaluatorFactory); + return filterOperatorFactory.get(driverContext); + } + private static Operator extractFieldsOperator( EsPhysicalOperationProviders.ShardContext shardContext, DriverContext driverContext, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java new file mode 100644 index 0000000000000..4dc0f0ea721f3 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; +import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.xpack.esql.capabilities.TranslationAware; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.stats.SearchContextStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; + +/** + * A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query. + * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position. + * In the future we can extend this to support more complex expressions, such as disjunctions or negations. + */ +public class ExpressionQueryList implements LookupEnrichQueryGenerator, PostJoinFilterable { + private final List queryLists; + private final List preJoinFilters = new ArrayList<>(); + private FilterExec postJoinFilter; + private final SearchExecutionContext context; + + public ExpressionQueryList( + List queryLists, + SearchExecutionContext context, + PhysicalPlan rightPreJoinPlan, + ClusterService clusterService + ) { + if (queryLists.size() < 2 && rightPreJoinPlan == null) { + throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); + } + this.queryLists = queryLists; + this.context = context; + buildPrePostJoinFilter(rightPreJoinPlan, clusterService); + } + + private void buildPrePostJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) { + // we support a FilterExec as the pre-join filter + // if the filter Exec is not translatable to a QueryBuilder, we will apply it after the join + if (rightPreJoinPlan instanceof FilterExec filterExec) { + try { + LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from( + SearchContextStats.from(List.of(context)), + new EsqlFlags(clusterService.getClusterSettings()) + ); + // If the pre-join filter is a FilterExec, we can convert it to a QueryBuilder + // try to convert it to a QueryBuilder, if not possible apply it after the join + if (filterExec.condition() instanceof TranslationAware translationAware + && TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { + preJoinFilters.add( + translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context) + ); + } else { + // if the filter is not translatable, we will apply it after the join + postJoinFilter = filterExec; + } + } catch (IOException e) { + throw new IllegalArgumentException("Failed to translate pre-join filter: " + filterExec, e); + } + + } else if (rightPreJoinPlan instanceof EsQueryExec esQueryExec) { + try { + // check the EsQueryExec for a pre-join filter + if (esQueryExec.query() != null) { + preJoinFilters.add(esQueryExec.query().toQuery(context)); + } + } catch (IOException e) { + throw new IllegalArgumentException("Failed to translate pre-join filter: " + esQueryExec, e); + } + } else if (rightPreJoinPlan != null) { + throw new IllegalArgumentException("Unsupported pre-join filter type: " + rightPreJoinPlan.getClass().getName()); + } + } + + @Override + public Query getQuery(int position) throws IOException { + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + for (QueryList queryList : queryLists) { + Query q = queryList.getQuery(position); + if (q == null) { + // if any of the matchFields are null, it means there is no match for this position + // A AND NULL is always NULL, so we can skip this position + return null; + } + builder.add(q, BooleanClause.Occur.FILTER); + } + // also attach the pre-join filter if it exists + for (Query preJoinFilter : preJoinFilters) { + builder.add(preJoinFilter, BooleanClause.Occur.FILTER); + } + return builder.build(); + } + + @Override + public int getPositionCount() { + int positionCount = queryLists.get(0).getPositionCount(); + for (QueryList queryList : queryLists) { + if (queryList.getPositionCount() != positionCount) { + throw new IllegalArgumentException( + "All QueryLists must have the same position count, expected: " + + positionCount + + ", but got: " + + queryList.getPositionCount() + ); + } + } + return positionCount; + } + + @Override + public FilterExec getPostJoinFilter() { + return postJoinFilter; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index b1313a3713b3b..8ce05535f4268 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -25,6 +25,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import java.io.IOException; import java.util.ArrayList; @@ -46,7 +47,8 @@ public record Factory( String lookupIndexPattern, String lookupIndex, List loadFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) implements OperatorFactory { @Override public String describe() { @@ -60,6 +62,7 @@ public String describe() { .append(" inputChannel=") .append(matchField.channel()); } + stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan); stringBuilder.append("]"); return stringBuilder.toString(); } @@ -76,7 +79,8 @@ public Operator get(DriverContext driverContext) { lookupIndexPattern, lookupIndex, loadFields, - source + source, + rightPreJoinPlan ); } } @@ -89,7 +93,8 @@ public Operator get(DriverContext driverContext) { private final List loadFields; private final Source source; private long totalRows = 0L; - private List matchFields; + private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; /** * Total number of pages emitted by this {@link Operator}. */ @@ -109,7 +114,8 @@ public LookupFromIndexOperator( String lookupIndexPattern, String lookupIndex, List loadFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests); this.matchFields = matchFields; @@ -120,6 +126,7 @@ public LookupFromIndexOperator( this.lookupIndex = lookupIndex; this.loadFields = loadFields; this.source = source; + this.rightPreJoinPlan = rightPreJoinPlan; } @Override @@ -146,7 +153,8 @@ protected void performAsync(Page inputPage, ActionListener listener newMatchFields, new Page(inputBlockArray), loadFields, - source + source, + rightPreJoinPlan ); lookupService.lookupAsync( request, @@ -203,6 +211,7 @@ public String toString() { .append(" inputChannel=") .append(matchField.channel()); } + stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan); stringBuilder.append("]"); return stringBuilder.toString(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 3eab54a7e0efc..8b79777d74987 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -20,7 +20,6 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; -import org.elasticsearch.compute.operator.lookup.ExpressionQueryList; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Releasables; @@ -38,6 +37,8 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import java.io.IOException; import java.util.ArrayList; @@ -88,7 +89,8 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque null, request.extractFields, request.matchFields, - request.source + request.source, + request.rightPreJoinPlan ); } @@ -100,6 +102,7 @@ protected LookupEnrichQueryGenerator queryList( Block inputBlock, Warnings warnings ) { + List queryLists = new ArrayList<>(); for (int i = 0; i < request.matchFields.size(); i++) { MatchConfig matchField = request.matchFields.get(i); @@ -112,10 +115,13 @@ protected LookupEnrichQueryGenerator queryList( ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); queryLists.add(q); } - if (queryLists.size() == 1) { + if (queryLists.size() == 1 + && (request.rightPreJoinPlan == null + || request.rightPreJoinPlan instanceof EsQueryExec esQueryExec && esQueryExec.query() == null)) { return queryLists.getFirst(); } - return new ExpressionQueryList(queryLists); + return new ExpressionQueryList(queryLists, context, request.rightPreJoinPlan, clusterService); + } @Override @@ -130,6 +136,7 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in public static class Request extends AbstractLookupService.Request { private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; Request( String sessionId, @@ -138,15 +145,18 @@ public static class Request extends AbstractLookupService.Request { List matchFields, Page inputPage, List extractFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source); this.matchFields = matchFields; + this.rightPreJoinPlan = rightPreJoinPlan; } } protected static class TransportRequest extends AbstractLookupService.TransportRequest { private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; // Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order // The channel information inside the MatchConfig, should say the same thing @@ -158,10 +168,12 @@ protected static class TransportRequest extends AbstractLookupService.TransportR Page toRelease, List extractFields, List matchFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source); this.matchFields = matchFields; + this.rightPreJoinPlan = rightPreJoinPlan; } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -207,6 +219,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro String sourceText = in.readString(); source = new Source(source.source(), sourceText); } + PhysicalPlan rightPreJoinPlan = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + rightPreJoinPlan = planIn.readOptionalNamedWriteable(PhysicalPlan.class); + } TransportRequest result = new TransportRequest( sessionId, shardId, @@ -215,7 +231,8 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro inputPage, extractFields, matchFields, - source + source, + rightPreJoinPlan ); result.setParentTask(parentTaskId); return result; @@ -258,11 +275,21 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) { out.writeString(source.text()); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + planOut.writeOptionalNamedWriteable(rightPreJoinPlan); + } + // JULIAN TODO: need a better way to indicate that the filter does not need to be applied here + /*else if (rightPreJoinPlan != null) { + throw new EsqlIllegalArgumentException("LOOKUP JOIN with pre-join filter is not supported on remote node"); + }*/ } @Override protected String extraDescription() { - return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")); + return " ,match_fields=" + + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")) + + ", rightPreJoinPlan=" + + rightPreJoinPlan; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java new file mode 100644 index 0000000000000..8e9fbb088b8f6 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; + +/** + * An interface for a join operator that needs to have a filter applied after the join happens + * For now we use this for applying filters that are not translatable after a lookup join + */ +public interface PostJoinFilterable { + FilterExec getPostJoinFilter(); +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java index 692bab7d653b1..3d73cee586ec6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java @@ -13,7 +13,10 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -140,14 +143,51 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { // 2. filter scoped to the right // 3. filter that requires both sides to be evaluated ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); - // push the left scoped filter down to the left child, keep the rest intact + boolean optimizationApplied = false; + // push the left scoped filter down to the left child if (scoped.leftFilters.size() > 0) { // push the filter down to the left child left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters)); // update the join with the new left child join = (Join) join.replaceLeft(left); + // we completely applied the left filters, so we can remove them from the scoped filters + scoped = new ScopedFilter(scoped.commonFilters(), List.of(), scoped.rightFilters); + optimizationApplied = true; + } + // push the right scoped filter down to the right child + if (scoped.rightFilters().isEmpty() == false && (join.right() instanceof Filter == false)) { + // push the filter down to the right child + List rightPushableFilters = buildRightPushableFilters(scoped.rightFilters()); + if (rightPushableFilters.isEmpty() == false) { + // right = new Filter(right.source(), right, Predicates.combineAnd(rightPushableFilters)); + // update the join with the new right child + // join = (Join) join.replaceRight(right); + Expression optionalRightHandSideFilters = Predicates.combineAnd(rightPushableFilters); + join = join.withOptionalRightHandFilters(optionalRightHandSideFilters); + optimizationApplied = true; + } + // We still want to reapply the filters that we just applied to the right child, + // so we do NOT update scoped, and we do NOT mark optimizationApplied as true. + // This is because by pushing them on the right side, we filter what rows we get from the right side + // But we do not limit the output rows of the join as the rows are kept as not matched on the left side + // So we end up applying the right filters twice, once on the right side and once on top of the join + // This will result in major performance optimization when the lookup join is expanding + // and applying the right filters reduces the expansion significantly. + // For example, consider a lookup join where the right side is a 1Bln rows index with the value join value of 1. + // We have 10 rows on the left side with the value join value of 1. + // and there is a filter on the right side that filters out all rows on another column + // If we push the filter down to the right side, we will have 10 rows after the join (there were no matches) + // If we do not push the filter down to the right side, we will have 10 * 1Bln rows after the join (all rows matched) + // as the join is expanding. + // They would be filtered out in the next operator, but it is too late, as we already expanded the join + // In other cases, we might not get any performance benefit of this optimization, + // especially when the selectivity of the filter pushed down is very high or the join is not expanding. - // keep the remaining filters in place, otherwise return the new join; + // In the future, once we have inner join support, it is usually possible to convert the lookup join into an inner join + // and then we don't need to reapply the filters on top of the join. + } + if (optimizationApplied) { + // if we pushed down some filters, we need to update the filters to reapply above the join Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters)); plan = remainingFilter != null ? filter.with(join, remainingFilter) : join; } @@ -156,6 +196,32 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { return plan; } + /** + * Builds the right pushable filters for the given expressions. + */ + private static List buildRightPushableFilters(List expressions) { + return expressions.stream().filter(x -> isRightPushableFilter(x)).toList(); + } + + /** + * Determines if the given expression can be pushed down to the right side of a join. + * A filter is right pushable if the filter's predicate evaluates to false or null when all fields are set to null + */ + private static boolean isRightPushableFilter(Expression filter) { + // traverse the filter tree + // replace any reference to an attribute with a null literal + Expression nullifiedFilter = filter.transformUp(Attribute.class, r -> new Literal(r.source(), null, DataType.NULL)); + // try to fold the filter + // check if the folded filter evaluates to false or null, if yes return true + // pushable WHERE field > 1 (evaluates to null), WHERE field is NOT NULL (evaluates to false) + // not pushable WHERE field is NULL (evaluates to true), WHERE coalesce(field, 10) = 10 (evaluates to true) + if (nullifiedFilter.foldable()) { + Object folded = nullifiedFilter.fold(FoldContext.small()); + return folded == null || Boolean.FALSE.equals(folded); + } + return false; + } + private static Function NO_OP = expression -> expression; private static LogicalPlan maybePushDownPastUnary( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java index 653722ec131f8..f3a2160546222 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java @@ -7,12 +7,14 @@ package org.elasticsearch.xpack.esql.plan.logical.join; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -152,7 +154,7 @@ public InlineJoin( List leftFields, List rightFields ) { - super(source, left, right, type, matchFields, leftFields, rightFields); + super(source, left, right, type, matchFields, leftFields, rightFields, false, null); } private static InlineJoin readFrom(StreamInput in) throws IOException { @@ -161,6 +163,9 @@ private static InlineJoin readFrom(StreamInput in) throws IOException { LogicalPlan left = in.readNamedWriteable(LogicalPlan.class); LogicalPlan right = in.readNamedWriteable(LogicalPlan.class); JoinConfig config = new JoinConfig(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + Expression ignored = in.readOptionalNamedWriteable(Expression.class); + } return new InlineJoin(source, left, replaceStub(left, right), config); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 2f217df1468a6..fc997719a6457 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plan.logical.join; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -15,6 +16,7 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; @@ -91,15 +93,24 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S private List lazyOutput; // Does this join involve remote indices? This is relevant only on the coordinating node, thus transient. private transient boolean isRemote = false; + private Expression optionalRightHandFilters = null; public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) { - this(source, left, right, config, false); + this(source, left, right, config, false, null); } - public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config, boolean isRemote) { + public Join( + Source source, + LogicalPlan left, + LogicalPlan right, + JoinConfig config, + boolean isRemote, + Expression optionalRightHandFilters + ) { super(source, left, right); this.config = config; this.isRemote = isRemote; + this.optionalRightHandFilters = optionalRightHandFilters; } public Join( @@ -109,15 +120,24 @@ public Join( JoinType type, List matchFields, List leftFields, - List rightFields + List rightFields, + boolean isRemote, + Expression optionalRightHandFilters ) { super(source, left, right); this.config = new JoinConfig(type, matchFields, leftFields, rightFields); + this.isRemote = isRemote; + this.optionalRightHandFilters = optionalRightHandFilters; } public Join(StreamInput in) throws IOException { super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class), in.readNamedWriteable(LogicalPlan.class)); this.config = new JoinConfig(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + this.optionalRightHandFilters = in.readOptionalNamedWriteable(Expression.class); + } else { + this.optionalRightHandFilters = null; + } } @Override @@ -126,6 +146,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteable(left()); out.writeNamedWriteable(right()); config.writeTo(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + out.writeOptionalNamedWriteable(optionalRightHandFilters()); + } + // as the optionalRightHandFilters are optional it is OK to not write them if the node does not support it + // it will still work, but performance might be worse } @Override @@ -149,7 +174,9 @@ protected NodeInfo info() { config.type(), config.matchFields(), config.leftFields(), - config.rightFields() + config.rightFields(), + isRemote(), + optionalRightHandFilters() ); } @@ -247,17 +274,25 @@ public boolean resolved() { } public Join withConfig(JoinConfig config) { - return new Join(source(), left(), right(), config, isRemote); + return new Join(source(), left(), right(), config, isRemote, optionalRightHandFilters()); + } + + public Expression optionalRightHandFilters() { + return this.optionalRightHandFilters; + } + + public Join withOptionalRightHandFilters(Expression optionalRightHandFilters) { + return new Join(source(), left(), right(), config(), isRemote, optionalRightHandFilters); } @Override public Join replaceChildren(LogicalPlan left, LogicalPlan right) { - return new Join(source(), left, right, config, isRemote); + return new Join(source(), left, right, config, isRemote, optionalRightHandFilters()); } @Override public int hashCode() { - return Objects.hash(config, left(), right(), isRemote); + return Objects.hash(config, left(), right(), isRemote, optionalRightHandFilters); } @Override @@ -273,7 +308,8 @@ public boolean equals(Object obj) { return config.equals(other.config) && Objects.equals(left(), other.left()) && Objects.equals(right(), other.right()) - && isRemote == other.isRemote; + && isRemote == other.isRemote + && Objects.equals(optionalRightHandFilters, other.optionalRightHandFilters); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java index 20913e0e27ce7..7e7e31c3dbc0b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java @@ -55,7 +55,7 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig } public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig, boolean isRemote) { - super(source, left, right, joinConfig, isRemote); + super(source, left, right, joinConfig, isRemote, null); } /** @@ -64,7 +64,7 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig @Override public LogicalPlan surrogate() { // TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now - return new Join(source(), left(), right(), config(), isRemote()); + return new Join(source(), left(), right(), config(), isRemote(), optionalRightHandFilters()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java index 2aff38993aa98..e7e07835eaee8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java @@ -7,11 +7,13 @@ package org.elasticsearch.xpack.esql.plan.physical; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -37,6 +39,7 @@ public class LookupJoinExec extends BinaryExec implements EstimatesRowSize { * the right hand side by a {@link EsQueryExec}, and thus lose the information of which fields we'll get from the lookup index. */ private final List addedFields; + private final Expression optionalRightHandFilters; private List lazyOutput; public LookupJoinExec( @@ -45,12 +48,14 @@ public LookupJoinExec( PhysicalPlan lookup, List leftFields, List rightFields, - List addedFields + List addedFields, + Expression optionalRightHandFilters ) { super(source, left, lookup); this.leftFields = leftFields; this.rightFields = rightFields; this.addedFields = addedFields; + this.optionalRightHandFilters = optionalRightHandFilters; } private LookupJoinExec(StreamInput in) throws IOException { @@ -58,6 +63,11 @@ private LookupJoinExec(StreamInput in) throws IOException { this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class); this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class); this.addedFields = in.readNamedWriteableCollectionAsList(Attribute.class); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + this.optionalRightHandFilters = in.readOptionalNamedWriteable(Expression.class); + } else { + this.optionalRightHandFilters = null; // For versions before the field was added, we default to null + } } @Override @@ -66,6 +76,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableCollection(leftFields); out.writeNamedWriteableCollection(rightFields); out.writeNamedWriteableCollection(addedFields); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + out.writeOptionalNamedWriteable(getOptionalRightHandFilters()); + } + // as the optionalRightHandFilters are optional it is OK to not write them if the node does not support it + // it will still work, but performance might be worse + } + + public Expression getOptionalRightHandFilters() { + return optionalRightHandFilters; } @Override @@ -136,12 +155,12 @@ public AttributeSet rightReferences() { @Override public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) { - return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields); + return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields, optionalRightHandFilters); } @Override protected NodeInfo info() { - return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields); + return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields, optionalRightHandFilters); } @Override @@ -156,11 +175,14 @@ public boolean equals(Object o) { return false; } LookupJoinExec other = (LookupJoinExec) o; - return leftFields.equals(other.leftFields) && rightFields.equals(other.rightFields) && addedFields.equals(other.addedFields); + return leftFields.equals(other.leftFields) + && rightFields.equals(other.rightFields) + && addedFields.equals(other.addedFields) + && Objects.equals(optionalRightHandFilters, other.optionalRightHandFilters); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields); + return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields, optionalRightHandFilters); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index ec1539fa3ae38..e7114e73d900f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -780,7 +780,11 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan } matchFields.add(new MatchConfig(right, input)); } - + PhysicalPlan rightPreJoinPlan = join.right(); + if (join.getOptionalRightHandFilters() != null) { + // If there are filters on the right side, we need to apply them before the join + rightPreJoinPlan = new FilterExec(Source.EMPTY, rightPreJoinPlan, join.getOptionalRightHandFilters()); + } return source.with( new LookupFromIndexOperator.Factory( matchFields, @@ -791,7 +795,8 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan localSourceExec.indexPattern(), indexName, join.addedFields().stream().map(f -> (NamedExpression) f).toList(), - join.source() + join.source(), + rightPreJoinPlan ), layout ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index a1671bffc5c25..dd5a3b39e55f7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; @@ -111,11 +112,33 @@ private PhysicalPlan mapBinary(BinaryPlan binary) { join.rightOutputFields() ); } - if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (right instanceof FilterExec filterExec) { + LookupJoinExec lookupJoinExec = getLookupJoinExec(join, filterExec.child(), left, config); + if (lookupJoinExec != null) { + // build the right child as a FilterExec with the original lookupJoinExec.right() as the child + FilterExec newRightChild = filterExec.replaceChild(lookupJoinExec.right()); + return lookupJoinExec.replaceChildren(lookupJoinExec.left(), newRightChild); + } } + LookupJoinExec lookupJoinExec = getLookupJoinExec(join, right, left, config); + if (lookupJoinExec != null) return lookupJoinExec; } return MapperUtils.unsupported(binary); } + + private static LookupJoinExec getLookupJoinExec(Join join, PhysicalPlan right, PhysicalPlan left, JoinConfig config) { + if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields(), + join.optionalRightHandFilters() + ); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index aac1d58e5f7f1..86ce9f0169aed 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; @@ -226,10 +227,23 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { join.rightOutputFields() ); } - if (right instanceof FragmentExec fragment - && fragment.fragment() instanceof EsRelation relation - && relation.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (right instanceof FragmentExec fragment) { + boolean isIndexModeLookup = fragment.fragment() instanceof EsRelation relation && relation.indexMode() == IndexMode.LOOKUP; + isIndexModeLookup = isIndexModeLookup + || fragment.fragment() instanceof Filter filter + && filter.child() instanceof EsRelation relation + && relation.indexMode() == IndexMode.LOOKUP; + if (isIndexModeLookup) { + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields(), + join.optionalRightHandFilters() + ); + } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index c4a381eccff65..2f5f0d7c7456d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -170,7 +170,8 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { lookupIndex, lookupIndex, loadFields, - Source.EMPTY + Source.EMPTY, + null ); } @@ -186,7 +187,7 @@ protected Matcher expectedToStringOfSimple() { for (int i = 0; i < numberOfJoinColumns; i++) { sb.append(" input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i); } - sb.append("]"); + sb.append(" right_pre_join_plan=null]"); return matchesPattern(sb.toString()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index e3f13fc331cdc..f9fc3597ef2d3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -1894,6 +1894,104 @@ public void testCombineOrderByThroughFilter() { as(filter.child(), EsRelation.class); } + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNULL(language_name{f}#19)] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testDoNotPushDownIsNullFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NULL + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + assertThat(join.right(), instanceOf(EsRelation.class)); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[language_name{f}#19 > a[KEYWORD]] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false,language_name{f}#19 > a[KEYWORD]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testPushDownGreaterThanFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name > "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + var right = as(join.right(), EsRelation.class); + assertThat(join.optionalRightHandFilters().toString(), is("language_name > \"a\"")); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[COALESCE(language_name{f}#19,a[KEYWORD]) == a[KEYWORD]] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testDoNotPushDownCoalesceFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE COALESCE(language_name, "a") == "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + assertThat(join.right(), instanceOf(EsRelation.class)); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + *uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNOTNULL(language_name{f}#19)] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false,ISNOTNULL(language_name{f}#19)] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testPushDownIsNotNullFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NOT NULL + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + var right = as(join.right(), EsRelation.class); + assertThat(join.optionalRightHandFilters().toString(), is("language_name IS NOT NULL")); + } + /** * Expected *
{@code
@@ -7050,11 +7148,11 @@ public void testLookupJoinPushDownFilterOnLeftSideField() {
      * Expects
      *
      * 
{@code
-     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16,
-     *          languages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
+     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
      * \_Limit[1000[INTEGER],false]
-     *   \_Filter[language_name{f}#19 == [45 6e 67 6c 69 73 68][KEYWORD]]
-     *     \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]]
+     *   \_Filter[language_name{f}#19 == English[KEYWORD]]
+     *     \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false,language_name{f}#19 == English[KEYWORD]]
      *       |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
      *       \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
      * }
@@ -7085,7 +7183,9 @@ public void testLookupJoinPushDownDisabledForLookupField() { assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); var leftRel = as(join.left(), EsRelation.class); - var rightRel = as(join.right(), EsRelation.class); + assertEquals("language_name == \"English\"", join.optionalRightHandFilters().toString()); + var joinRightEsRelation = as(join.right(), EsRelation.class); + } /** @@ -7094,11 +7194,11 @@ public void testLookupJoinPushDownDisabledForLookupField() { * Expects * *
{@code
-     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17,
-     *          languages{f}#11 AS language_code#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
+     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan
+     * guages{f}#11 AS language_code#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
      * \_Limit[1000[INTEGER],false]
-     *   \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD]]
-     *     \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19]]
+     *   \_Filter[language_name{f}#20 == English[KEYWORD]]
+     *     \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19],false,language_name{f}#20 == English[KEYWORD]]
      *       |_Filter[emp_no{f}#8 > 1[INTEGER]]
      *       | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
      *       \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20]
@@ -7138,6 +7238,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel
         assertThat(literal.value(), equalTo(1));
 
         var leftRel = as(filter.child(), EsRelation.class);
+        assertEquals("language_name == \"English\"", join.optionalRightHandFilters().toString());
         var rightRel = as(join.right(), EsRelation.class);
     }