diff --git a/docs/changelog/132635.yaml b/docs/changelog/132635.yaml new file mode 100644 index 0000000000000..1a4cb855624ea --- /dev/null +++ b/docs/changelog/132635.yaml @@ -0,0 +1,6 @@ +pr: 132635 +summary: Push filters on right hand-side of joins +area: ES|QL +type: enhancement +issues: + - 130024 diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index c5562f187b4ab..5e78bef516a86 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -359,6 +359,7 @@ static TransportVersion def(int id) { public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00); public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00); public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00); + public static final TransportVersion ESQL_LOOKUP_JOIN_FILTER = def(9_138_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 56b857ad90a82..549fb40857690 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.mapper.GeoShapeQueryable; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.RangeFieldMapper; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.internal.AliasFilter; @@ -56,6 +57,8 @@ public abstract class QueryList { protected final Block block; @Nullable protected final OnlySingleValueParams onlySingleValueParams; + @Nullable + protected final QueryBuilder filterQueryBuilder; protected QueryList( MappedFieldType field, @@ -63,12 +66,24 @@ protected QueryList( AliasFilter aliasFilter, Block block, OnlySingleValueParams onlySingleValueParams + ) { + this(field, searchExecutionContext, aliasFilter, block, onlySingleValueParams, null); + } + + private QueryList( + MappedFieldType field, + SearchExecutionContext searchExecutionContext, + AliasFilter aliasFilter, + Block block, + OnlySingleValueParams onlySingleValueParams, + QueryBuilder filterQueryBuilder ) { this.searchExecutionContext = searchExecutionContext; this.aliasFilter = aliasFilter; this.field = field; this.block = block; this.onlySingleValueParams = onlySingleValueParams; + this.filterQueryBuilder = filterQueryBuilder; } /** @@ -87,6 +102,27 @@ int getPositionCount() { */ public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage); + public QueryList withFilterQueryBuilder(QueryBuilder filterQueryBuilder) { + return new QueryList( + this.field, + this.searchExecutionContext, + this.aliasFilter, + this.block, + this.onlySingleValueParams, + filterQueryBuilder + ) { + @Override + public QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage) { + return QueryList.this.onlySingleValues(warnings, multiValueWarningMessage); + } + + @Override + Query doGetQuery(int position, int firstValueIndex, int valueCount) { + return QueryList.this.doGetQuery(position, firstValueIndex, valueCount); + } + }; + } + final Query getQuery(int position) { final int valueCount = block.getValueCount(position); if (onlySingleValueParams != null && valueCount != 1) { @@ -100,20 +136,42 @@ final Query getQuery(int position) { final int firstValueIndex = block.getFirstValueIndex(position); Query query = doGetQuery(position, firstValueIndex, valueCount); + if (query == null) { + return null; + } + + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + boolean builderHasClauses = false; if (aliasFilter != null && aliasFilter != AliasFilter.EMPTY) { - BooleanQuery.Builder builder = new BooleanQuery.Builder(); - builder.add(query, BooleanClause.Occur.FILTER); try { builder.add(aliasFilter.getQueryBuilder().toQuery(searchExecutionContext), BooleanClause.Occur.FILTER); - query = builder.build(); + builderHasClauses = true; } catch (IOException e) { throw new UncheckedIOException("Error while building query for alias filter", e); } } if (onlySingleValueParams != null) { - query = wrapSingleValueQuery(query); + Query singleValueQuery = wrapSingleValueQuery(); + if (singleValueQuery != null) { + builder.add(singleValueQuery, BooleanClause.Occur.FILTER); + builderHasClauses = true; + } + } + + if (filterQueryBuilder != null) { + try { + builder.add(filterQueryBuilder.toQuery(searchExecutionContext), BooleanClause.Occur.FILTER); + builderHasClauses = true; + } catch (IOException e) { + throw new UncheckedIOException("Error while building filter query", e); + } + } + + if (builderHasClauses) { + builder.add(query, BooleanClause.Occur.FILTER); + return builder.build(); } return query; @@ -125,7 +183,7 @@ final Query getQuery(int position) { @Nullable abstract Query doGetQuery(int position, int firstValueIndex, int valueCount); - private Query wrapSingleValueQuery(Query query) { + private Query wrapSingleValueQuery() { assert onlySingleValueParams != null : "Requested to wrap single value query without single value params"; SingleValueMatchQuery singleValueQuery = new SingleValueMatchQuery( @@ -138,18 +196,11 @@ private Query wrapSingleValueQuery(Query query) { Query rewrite; try { rewrite = singleValueQuery.rewrite(searchExecutionContext.searcher()); - if (rewrite instanceof MatchAllDocsQuery) { - // nothing to filter - return query; - } } catch (IOException e) { throw new UncheckedIOException("Error while rewriting SingleValueQuery", e); } - BooleanQuery.Builder builder = new BooleanQuery.Builder(); - builder.add(query, BooleanClause.Occur.FILTER); - builder.add(rewrite, BooleanClause.Occur.FILTER); - return builder.build(); + return rewrite instanceof MatchAllDocsQuery ? /* nothing to filter */ null : rewrite; } /** diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index e25cb82f29851..3a5fb438db35d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -235,6 +235,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws "lookup", new FieldAttribute.FieldName("key"), List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))), + null, Source.EMPTY ); DriverContext driverContext = driverContext(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index dd305f09c12dc..257a48f5f8653 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -52,6 +52,7 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -197,6 +198,7 @@ protected abstract QueryList queryList( AliasFilter aliasFilter, Block inputBlock, @Nullable DataType inputDataType, + // @Nullable QueryBuilder filterQueryBuilder, Warnings warnings ); @@ -342,6 +344,9 @@ private void doLookup(T request, CancellableTask task, ActionListener request.inputDataType, warnings ); + if (request.filterQueryBuilder != null) { + queryList = queryList.withFilterQueryBuilder(request.filterQueryBuilder); + } var queryOperator = new EnrichQuerySourceOperator( driverContext.blockFactory(), EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE, @@ -511,6 +516,7 @@ abstract static class Request { final DataType inputDataType; final Page inputPage; final List extractFields; + final QueryBuilder filterQueryBuilder; final Source source; Request( @@ -520,6 +526,7 @@ abstract static class Request { DataType inputDataType, Page inputPage, List extractFields, + QueryBuilder filterQueryBuilder, Source source ) { this.sessionId = sessionId; @@ -528,8 +535,21 @@ abstract static class Request { this.inputDataType = inputDataType; this.inputPage = inputPage; this.extractFields = extractFields; + this.filterQueryBuilder = filterQueryBuilder; this.source = source; } + + Request( + String sessionId, + String index, + String indexPattern, + DataType inputDataType, + Page inputPage, + List extractFields, + Source source + ) { + this(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, null, source); + } } abstract static class TransportRequest extends AbstractTransportRequest implements IndicesRequest { @@ -543,6 +563,8 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen final DataType inputDataType; final Page inputPage; final List extractFields; + @Nullable // may be missing: either no filter to apply, or remote node is older than ESQL_LOOKUP_JOIN_FILTER + final QueryBuilder filterQueryBuilder; final Source source; // TODO: Remove this workaround once we have Block RefCount final Page toRelease; @@ -556,6 +578,7 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen Page inputPage, Page toRelease, List extractFields, + QueryBuilder filterQueryBuilder, Source source ) { this.sessionId = sessionId; @@ -565,9 +588,23 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen this.inputPage = inputPage; this.toRelease = toRelease; this.extractFields = extractFields; + this.filterQueryBuilder = filterQueryBuilder; this.source = source; } + TransportRequest( + String sessionId, + ShardId shardId, + String indexPattern, + DataType inputDataType, + Page inputPage, + Page toRelease, + List extractFields, + Source source + ) { + this(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, null, source); + } + @Override public final String[] indices() { return new String[] { indexPattern }; @@ -625,6 +662,8 @@ public final String toString() { + inputDataType + " ,extract_fields=" + extractFields + + ", filterQueryBuilder=" + + filterQueryBuilder + " ,positions=" + inputPage.getPositionCount() + extraDescription() diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index 2b41edcbeff0b..13d72c91f0895 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -21,6 +21,7 @@ import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -48,6 +49,7 @@ public record Factory( String lookupIndex, FieldAttribute.FieldName matchField, List loadFields, + QueryBuilder filterQueryBuilder, Source source ) implements OperatorFactory { @Override @@ -62,6 +64,8 @@ public String describe() { + loadFields + " inputChannel=" + inputChannel + + " filterQueryBuilder=" + + filterQueryBuilder + "]"; } @@ -79,6 +83,7 @@ public Operator get(DriverContext driverContext) { lookupIndex, matchField.string(), loadFields, + filterQueryBuilder, source ); } @@ -93,6 +98,7 @@ public Operator get(DriverContext driverContext) { private final String lookupIndex; private final String matchField; private final List loadFields; + private final QueryBuilder filterQueryBuilder; private final Source source; private long totalTerms = 0L; /** @@ -116,6 +122,7 @@ public LookupFromIndexOperator( String lookupIndex, String matchField, List loadFields, + QueryBuilder filterQueryBuilder, Source source ) { super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests); @@ -128,6 +135,7 @@ public LookupFromIndexOperator( this.lookupIndex = lookupIndex; this.matchField = matchField; this.loadFields = loadFields; + this.filterQueryBuilder = filterQueryBuilder; this.source = source; } @@ -143,6 +151,7 @@ protected void performAsync(Page inputPage, ActionListener listener matchField, new Page(inputBlock), loadFields, + filterQueryBuilder, source ); lookupService.lookupAsync( @@ -200,6 +209,8 @@ public String toString() { + loadFields + " inputChannel=" + inputChannel + + " filterQueryBuilder=" + + filterQueryBuilder + "]"; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 972a952a7b1fd..225db2ee1de5c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -23,6 +23,7 @@ import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -44,7 +45,7 @@ /** * {@link LookupFromIndexService} performs lookup against a Lookup index for * a given input page. See {@link AbstractLookupService} for how it works - * where it refers to this process as a {@code LEFT JOIN}. Which is mostly is. + * where it refers to this process as a {@code LEFT JOIN}. Which it mostly is. */ public class LookupFromIndexService extends AbstractLookupService { public static final String LOOKUP_ACTION_NAME = EsqlQueryAction.NAME + "/lookup_from_index"; @@ -85,6 +86,7 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque null, request.extractFields, request.matchField, + request.filterQueryBuilder, request.source ); } @@ -125,9 +127,10 @@ public static class Request extends AbstractLookupService.Request { String matchField, Page inputPage, List extractFields, + QueryBuilder filterQueryBuilder, Source source ) { - super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, source); + super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, filterQueryBuilder, source); this.matchField = matchField; } } @@ -144,9 +147,10 @@ protected static class TransportRequest extends AbstractLookupService.TransportR Page toRelease, List extractFields, String matchField, + QueryBuilder filterQueryBuilder, Source source ) { - super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source); + super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, filterQueryBuilder, source); this.matchField = matchField; } @@ -171,6 +175,9 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null); List extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class); String matchField = in.readString(); + QueryBuilder filterQueryBuilder = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_FILTER) + ? in.readOptionalNamedWriteable(QueryBuilder.class) + : null; var source = Source.EMPTY; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) { source = Source.readFrom(planIn); @@ -190,6 +197,7 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro inputPage, extractFields, matchField, + filterQueryBuilder, source ); result.setParentTask(parentTaskId); @@ -214,6 +222,9 @@ public void writeTo(StreamOutput out) throws IOException { PlanStreamOutput planOut = new PlanStreamOutput(out, null); planOut.writeNamedWriteableCollection(extractFields); out.writeString(matchField); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_FILTER)) { + out.writeOptionalNamedWriteable(filterQueryBuilder); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) { source.writeTo(planOut); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index b7cb322b06416..1a04e7ad363e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferNonNullAggConstraint; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LocalPropagateEmptyRelation; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LocalPushDownFiltersRightPastJoin; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceDateTruncBucketWithRoundTo; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstantOrNull; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort; @@ -82,6 +83,7 @@ private static Batch localOperators() { // add rule that should only apply locally newRules.add(new ReplaceStringCasingWithInsensitiveRegexMatch()); + newRules.add(new LocalPushDownFiltersRightPastJoin()); return operators.with(newRules.toArray(Rule[]::new)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java index 692bab7d653b1..eec836b888583 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java @@ -82,15 +82,15 @@ protected LogicalPlan rule(Filter filter) { } else if (child instanceof RegexExtract re) { // Push down filters that do not rely on attributes created by RegexExtract var attributes = AttributeSet.of(Expressions.asAttributes(re.extractedFields())); - plan = maybePushDownPastUnary(filter, re, attributes::contains, NO_OP); + plan = maybePushDownPastUnary(filter, re, attributes::contains, Function.identity()); } else if (child instanceof InferencePlan inferencePlan) { // Push down filters that do not rely on attributes created by Completion var attributes = AttributeSet.of(inferencePlan.generatedAttributes()); - plan = maybePushDownPastUnary(filter, inferencePlan, attributes::contains, NO_OP); + plan = maybePushDownPastUnary(filter, inferencePlan, attributes::contains, Function.identity()); } else if (child instanceof Enrich enrich) { // Push down filters that do not rely on attributes created by Enrich var attributes = AttributeSet.of(Expressions.asAttributes(enrich.enrichFields())); - plan = maybePushDownPastUnary(filter, enrich, attributes::contains, NO_OP); + plan = maybePushDownPastUnary(filter, enrich, attributes::contains, Function.identity()); } else if (child instanceof Project) { return PushDownUtils.pushDownPastProject(filter); } else if (child instanceof OrderBy orderBy) { @@ -106,13 +106,13 @@ protected LogicalPlan rule(Filter filter) { return plan; } - private record ScopedFilter(List commonFilters, List leftFilters, List rightFilters) {} + public record ScopedFilter(List commonFilters, List leftFilters, List rightFilters) {} // split the filter condition in 3 parts: // 1. filter scoped to the left // 2. filter scoped to the right // 3. filter that requires both sides to be evaluated - private static ScopedFilter scopeFilter(List filters, LogicalPlan left, LogicalPlan right) { + public static ScopedFilter scopeFilter(List filters, LogicalPlan left, LogicalPlan right) { List rest = new ArrayList<>(filters); List leftFilters = new ArrayList<>(); List rightFilters = new ArrayList<>(); @@ -120,7 +120,7 @@ private static ScopedFilter scopeFilter(List filters, LogicalPlan le AttributeSet leftOutput = left.outputSet(); AttributeSet rightOutput = right.outputSet(); - // first remove things that are left scoped only + // first remove things that are left scoped only. note: this will actually collect the common fields too rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f)); // followed by right scoped only rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f)); @@ -156,8 +156,6 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { return plan; } - private static Function NO_OP = expression -> expression; - private static LogicalPlan maybePushDownPastUnary( Filter filter, UnaryPlan unary, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/LocalPushDownFiltersRightPastJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/LocalPushDownFiltersRightPastJoin.java new file mode 100644 index 0000000000000..81fc6226e0de5 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/LocalPushDownFiltersRightPastJoin.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical.local; + +import org.elasticsearch.xpack.esql.capabilities.TranslationAware; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.util.CollectionUtils; +import org.elasticsearch.xpack.esql.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; + +import java.util.List; + +import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters.scopeFilter; + +public final class LocalPushDownFiltersRightPastJoin extends OptimizerRules.ParameterizedOptimizerRule< + Filter, + LocalLogicalOptimizerContext> { + + public LocalPushDownFiltersRightPastJoin() { + super(OptimizerRules.TransformDirection.DOWN); + } + + @Override + protected LogicalPlan rule(Filter filter, LocalLogicalOptimizerContext context) { + LogicalPlan child = filter.child(); + // TODO: see note in PushDownAndCombineFilters about InlineJoin + return child instanceof Join join && child instanceof InlineJoin == false ? pushDownRightPastJoin(filter, join, context) : filter; + } + + private static LogicalPlan pushDownRightPastJoin(Filter filter, Join join, LocalLogicalOptimizerContext context) { + LogicalPlan plan = filter; + // pushdown only through LEFT joins + // TODO: generalize this for other join types + if (join.config().type() == JoinTypes.LEFT) { + LogicalPlan left = join.left(); + LogicalPlan right = join.right(); + + PushDownAndCombineFilters.ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); + // Only push down right if the filter can be merged; it would otherwise generate extra work, as the filter's kept before the + // join too. This is needed since the join produces `null`s for the fields corresponding to the rows that the pushed condition + // filters out. + if (areRightFiltersPushable(scoped.rightFilters(), context)) { + var combinedRightFilters = Predicates.combineAnd(scoped.rightFilters()); + // avoid re-injecting the same filter if the rule applied already before. + if (right instanceof Filter == false || ((Filter) right).condition().semanticEquals(combinedRightFilters) == false) { + // push the right scoped filter down to the right child + right = new Filter(right.source(), right, combinedRightFilters); + // update the join with the new right child + join = (Join) join.replaceRight(right); + } + } + // keep the remaining filters in place, otherwise return the new join; + Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters(), scoped.rightFilters())); + plan = remainingFilter != null ? filter.with(join, remainingFilter) : join; + } + // ignore the rest of the join + return plan; + } + + private static boolean areRightFiltersPushable(List filters, LocalLogicalOptimizerContext ctx) { + if (filters.isEmpty()) { + return false; + } + // TODO: the flag isn't relevant for the pushdown decision? + LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), new EsqlFlags(true)); + for (Expression filter : filters) { + // the rigth filters will remain on top of the join, so any not-NO value is acceptable for "is it pushable?" + if (TranslationAware.translatable(filter, pushdownPredicates) == TranslationAware.Translatable.NO) { + return false; + } + } + return true; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index fb7b6ccab5d5e..e3c997b0aae04 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -731,10 +731,11 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan } Layout layout = layoutBuilder.build(); - EsQueryExec localSourceExec = (EsQueryExec) join.lookup(); - if (localSourceExec.indexMode() != IndexMode.LOOKUP) { + var lookup = join.lookup(); + if (lookup instanceof EsQueryExec == false || ((EsQueryExec) lookup).indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } + EsQueryExec localSourceExec = (EsQueryExec) lookup; // After enabling remote joins, we can have one of the two situations here: // 1. We've just got one entry - this should be the one relevant to the join, and it should be for this cluster @@ -797,6 +798,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan indexName, matchConfig.fieldName(), join.addedFields().stream().map(f -> (NamedExpression) f).toList(), + localSourceExec.query(), join.source() ), layout diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index a1671bffc5c25..17b1b45dcac47 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -111,7 +111,8 @@ private PhysicalPlan mapBinary(BinaryPlan binary) { join.rightOutputFields() ); } - if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { + var leaves = right.collectLeaves(); + if (leaves.size() == 1 && leaves.get(0) instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index aac1d58e5f7f1..e6da551a6e715 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -226,10 +226,18 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { join.rightOutputFields() ); } - if (right instanceof FragmentExec fragment - && fragment.fragment() instanceof EsRelation relation - && relation.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (right instanceof FragmentExec fragment) { + var leaves = fragment.fragment().collectLeaves(); + if (leaves.size() == 1 && leaves.get(0) instanceof EsRelation relation && relation.indexMode() == IndexMode.LOOKUP) { + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields() + ); + } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index 75b1a4db120e1..d0869da37767e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -153,6 +153,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { lookupIndex, matchField, loadFields, + null, Source.EMPTY ); } @@ -165,7 +166,8 @@ protected Matcher expectedDescriptionOfSimple() { @Override protected Matcher expectedToStringOfSimple() { return matchesPattern( - "LookupOperator\\[index=idx input_type=LONG match_field=match load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] inputChannel=0]" + "LookupOperator\\[index=idx input_type=LONG match_field=match load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] " + + "inputChannel=0 filterQueryBuilder=null]" ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index cd6371e4d4d5e..ac9fa3a25c74b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -1076,6 +1076,60 @@ public void testMissingFieldsNotPurgingTheJoinLocally() { var right = as(join.right(), EsQueryExec.class); } + /* + * ProjectExec[[emp_no{f}#9, languages{f}#12 AS language_code#6, language_name{f}#21]] + * \_LimitExec[1000[INTEGER],58] + * \_ExchangeExec[[emp_no{f}#9, languages{f}#12, language_name{f}#21],false] + * \_ProjectExec[[emp_no{f}#9, languages{f}#12, language_name{f}#21]] + * \_FieldExtractExec[emp_no{f}#9]<[],[]> + * \_LimitExec[1000[INTEGER],74] + * \_FilterExec[language_name{f}#21 == foo[KEYWORD]] + * \_LookupJoinExec[[languages{f}#12],[language_code{f}#20],[language_name{f}#21]] + * |_FieldExtractExec[languages{f}#12]<[],[]> + * | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#22], limit[], sort[] estimatedRowSize[62] + * \_EsQueryExec[languages_lookup], indexMode[lookup], query[{"esql_single_value":{"field":"language_name", + * "next":{"term":{"language_name":{"value":"foo","boost":0.0}}}, + * "source":"language_name == \"foo\"@5:9"}}][_doc{f}#23], limit[], sort[] estimatedRowSize[4] + */ + public void testJoinWithFilterOnTheRight() { + var plan = plannerOptimizer.plan(""" + from test + | keep emp_no, languages + | rename languages AS language_code + | lookup join languages_lookup ON language_code + | where language_name == "foo" + """); + + var project = as(plan, ProjectExec.class); + assertThat(Expressions.names(project.projections()), contains("emp_no", "language_code", "language_name")); + var limit = as(project.child(), LimitExec.class); + var exchange = as(limit.child(), ExchangeExec.class); + project = as(exchange.child(), ProjectExec.class); + assertThat(Expressions.names(project.projections()), contains("emp_no", "languages", "language_name")); + var extract = as(project.child(), FieldExtractExec.class); + assertThat(Expressions.names(extract.attributesToExtract()), contains("emp_no")); + limit = as(extract.child(), LimitExec.class); + var filter = as(limit.child(), FilterExec.class); + assertThat(filter.condition().toString(), is("language_name == \"foo\"")); + var join = as(filter.child(), LookupJoinExec.class); + // left + extract = as(join.left(), FieldExtractExec.class); + assertThat(Expressions.names(extract.attributesToExtract()), contains("languages")); + var source = as(extract.child(), EsQueryExec.class); + assertThat(source.indexMode(), is(IndexMode.STANDARD)); + // right + source = as(join.right(), EsQueryExec.class); + assertThat(source.indexPattern(), is("languages_lookup")); + assertThat(source.indexMode(), is(IndexMode.LOOKUP)); + assertThat( + source.query().toString().replaceAll("\\s+", ""), + is( + "{\"esql_single_value\":{\"field\":\"language_name\",\"next\":{\"term\":{\"language_name\":" + + "{\"value\":\"foo\",\"boost\":0.0}}},\"source\":\"language_name==\\\"foo\\\"@5:9\"}}" + ) + ); + } + /* * LimitExec[1000[INTEGER]] * \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]