diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index 301273d624614..91fd8827ff765 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -402,6 +402,61 @@ public void testTopNThenEnrichRemote() { assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } + + // No renames, no KEEP - this is required to verify that ENRICH does not break sort with fields it overrides + query = """ + FROM *:events,events + | eval ip= TO_STR(host) + | SORT timestamp, user, ip + | LIMIT 5 + | ENRICH _remote:hosts + """; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat( + getValuesList(resp), + equalTo( + List.of( + List.of("192.168.1.2", 1L, "andres", "192.168.1.2", "Windows"), + List.of("192.168.1.3", 1L, "matthew", "192.168.1.3", "MacOS"), + Arrays.asList("192.168.1.25", 1L, "park", (String) null, (String) null), + List.of("192.168.1.5", 2L, "akio", "192.168.1.5", "Android"), + List.of("192.168.1.6", 2L, "sergio", "192.168.1.6", "iOS") + ) + ) + ); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); + assertCCSExecutionInfoDetails(executionInfo); + } + } + + public void testLimitWithCardinalityChange() { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | LIMIT 10 + | WHERE user != "andres" + | %s + """, enrichHosts(Enrich.Mode.REMOTE)); + // This is currently not supported, because WHERE is not cardinality preserving + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); + assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [LIMIT 10]@3:3")); + } + + public void testTopNTwiceThenEnrichRemote() { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | SORT timestamp + | LIMIT 9 + | SORT ip, user + | LIMIT 5 + | ENRICH _remote:hosts + """, enrichHosts(Enrich.Mode.REMOTE)); + // This is currently not supported, because we can not handle double topN with remote enrich + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); + assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [SORT timestamp]")); } public void testLimitThenEnrichRemote() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/PostOptimizationVerificationAware.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/PostOptimizationVerificationAware.java index 6be3a3d482855..c6d84e79b7c2d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/PostOptimizationVerificationAware.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/PostOptimizationVerificationAware.java @@ -15,6 +15,11 @@ */ public interface PostOptimizationVerificationAware { + /** + * Marker interface for verifiers that should only be run on Coordinator + */ + interface CoordinatorOnly extends PostOptimizationVerificationAware {} + /** * Validates the implementing expression - discovered failures are reported to the given * {@link Failures} class. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index 088b1ad15557f..4c72d079098c8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -9,8 +9,8 @@ import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.common.Failures; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation; -import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull; @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.esql.rule.Rule; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; @@ -39,7 +40,7 @@ */ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor { - private final LogicalVerifier verifier = LogicalVerifier.INSTANCE; + private final LogicalVerifier verifier = LogicalVerifier.LOCAL_INSTANCE; private static final List> RULES = arrayAsArrayList( new Batch<>( @@ -53,7 +54,7 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor localOperators() { for (var r : rules) { switch (r) { case PropagateEmptyRelation ignoredPropagate -> newRules.add(new LocalPropagateEmptyRelation()); - // skip it: once a fragment contains an Agg, this can no longer be pruned, which the rule can do - case ReplaceStatsFilteredAggWithEval ignoredReplace -> { + case OptimizerRules.CoordinatorOnly ignored -> { } default -> newRules.add(r); } @@ -88,9 +88,18 @@ private static Batch localOperators() { return operators.with(newRules.toArray(Rule[]::new)); } + @SuppressWarnings("unchecked") + private static Batch localCleanup() { + var cleanup = cleanup(); + return cleanup.with( + // Remove CoordinatorOnly rules + Arrays.stream(cleanup.rules()).filter(r -> r instanceof OptimizerRules.CoordinatorOnly == false).toArray(Rule[]::new) + ); + } + public LogicalPlan localOptimize(LogicalPlan plan) { LogicalPlan optimized = execute(plan); - Failures failures = verifier.verify(optimized, true, plan.output()); + Failures failures = verifier.verify(optimized, plan.output()); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index deb79221b3ea0..71e73e5e4c80d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -36,7 +36,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor> RULES = rules(true); - private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; + private final PhysicalVerifier verifier = PhysicalVerifier.LOCAL_INSTANCE; public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) { super(context); @@ -47,7 +47,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) { } PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { - Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes); + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index ae346e4d52449..5e5a38641c298 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -16,10 +16,13 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineBinaryComparisons; import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineDisjunctions; import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineEvals; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineLimitTopN; import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineProjections; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConstantFolding; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter; import org.elasticsearch.xpack.esql.optimizer.rules.logical.FoldNull; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichLimit; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichTopN; import org.elasticsearch.xpack.esql.optimizer.rules.logical.LiteralsOnTheRight; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PartiallyFoldCase; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation; @@ -93,8 +96,8 @@ *
  • {@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN
  • * * - *

    Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the - * {@link LocalLogicalPlanOptimizer} layer.

    + *

    Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied + * at the {@link LocalLogicalPlanOptimizer} layer.

    */ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor { @@ -115,7 +118,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) { public LogicalPlan optimize(LogicalPlan verified) { var optimized = execute(verified); - Failures failures = verifier.verify(optimized, false, verified.output()); + Failures failures = verifier.verify(optimized, verified.output()); if (failures.hasFailures()) { throw new VerificationException(failures); } @@ -166,6 +169,7 @@ protected static Batch substitutions() { protected static Batch operators(boolean local) { return new Batch<>( "Operator Optimization", + new HoistRemoteEnrichLimit(), new CombineProjections(local), new CombineEvals(), new PropagateEmptyRelation(), @@ -212,6 +216,13 @@ protected static Batch operators(boolean local) { } protected static Batch cleanup() { - return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new ReplaceRowAsLocalRelation(), new PropgateUnmappedFields()); + return new Batch<>( + "Clean Up", + new ReplaceLimitAndSortAsTopN(), + new HoistRemoteEnrichTopN(), + new ReplaceRowAsLocalRelation(), + new PropgateUnmappedFields(), + new CombineLimitTopN() + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java index 4aa70be5713e8..8610de88ae41f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker; -import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.util.ArrayList; @@ -19,21 +18,11 @@ import java.util.function.BiConsumer; public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier { + public static final LogicalVerifier LOCAL_INSTANCE = new LogicalVerifier(true); + public static final LogicalVerifier INSTANCE = new LogicalVerifier(false); - public static final LogicalVerifier INSTANCE = new LogicalVerifier(); - - private LogicalVerifier() {} - - @Override - boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) { - if (skipRemoteEnrichVerification) { - // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 - var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance); - if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { - return true; - } - } - return false; + private LogicalVerifier(boolean isLocal) { + super(isLocal); } @Override @@ -44,14 +33,16 @@ void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures PlanConsistencyChecker.checkPlan(p, depFailures); if (failures.hasFailures() == false) { - if (p instanceof PostOptimizationVerificationAware pova) { + if (p instanceof PostOptimizationVerificationAware pova + && (pova instanceof PostOptimizationVerificationAware.CoordinatorOnly && isLocal) == false) { pova.postOptimizationVerification(failures); } if (p instanceof PostOptimizationPlanVerificationAware popva) { checkers.add(popva.postOptimizationPlanVerification()); } p.forEachExpression(ex -> { - if (ex instanceof PostOptimizationVerificationAware va) { + if (ex instanceof PostOptimizationVerificationAware va + && (va instanceof PostOptimizationVerificationAware.CoordinatorOnly && isLocal) == false) { va.postOptimizationVerification(failures); } }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java index 6d60c547f47d6..f25ed78ae1c04 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java @@ -39,7 +39,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) { } PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { - Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes); + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java index 781a8f5263c1f..1b644e063633a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java @@ -12,8 +12,7 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker; -import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; @@ -21,21 +20,11 @@ /** Physical plan verifier. */ public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier { + public static final PhysicalVerifier LOCAL_INSTANCE = new PhysicalVerifier(true); + public static final PhysicalVerifier INSTANCE = new PhysicalVerifier(false); - public static final PhysicalVerifier INSTANCE = new PhysicalVerifier(); - - private PhysicalVerifier() {} - - @Override - boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) { - if (skipRemoteEnrichVerification) { - // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 - var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); - if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { - return true; - } - } - return false; + private PhysicalVerifier(boolean isLocal) { + super(isLocal); } @Override @@ -54,6 +43,19 @@ void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failure ); } } + + // This check applies only for coordinator physical plans (isLocal == false) + if (isLocal == false && p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) { + failures.add( + fail( + p, + "Physical plan contains remote executing operation [{}] in local part. " + + "This usually means this command is incompatible with some of the preceding commands.", + p.nodeName() + ) + ); + } + PlanConsistencyChecker.checkPlan(p, depFailures); if (failures.hasFailures() == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java index e187264b2432f..168b41ec9807e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java @@ -28,13 +28,17 @@ */ public abstract class PostOptimizationPhasePlanVerifier

    > { + // Are we verifying the global plan (coordinator) or a local plan (data node)? + protected final boolean isLocal; + + protected PostOptimizationPhasePlanVerifier(boolean isLocal) { + this.isLocal = isLocal; + } + /** Verifies the optimized plan */ - public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List expectedOutputAttributes) { + public Failures verify(P optimizedPlan, List expectedOutputAttributes) { Failures failures = new Failures(); Failures depFailures = new Failures(); - if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) { - return failures; - } checkPlanConsistency(optimizedPlan, failures, depFailures); @@ -47,8 +51,6 @@ public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, Li return failures; } - abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification); - abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures); private static void verifyOutputNotChanged(QueryPlan optimizedPlan, List expectedOutputAttributes, Failures failures) { @@ -67,7 +69,7 @@ private static void verifyOutputNotChanged(QueryPlan optimizedPlan, List x.name().equals(ProjectAwayColumns.ALL_FIELDS_PROJECTED)); // LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by - // ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the + // ReplaceSourceAttributes. Because InsertFieldExtraction doesn't apply to lookup indices, the // right hand side will only have the EsQueryExec providing the _doc attribute and nothing else. // We perform an optimizer run on every fragment. LookupJoinExec also contains such a fragment, // and currently it only contains an EsQueryExec after optimization. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineLimitTopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineLimitTopN.java new file mode 100644 index 0000000000000..0235d64603fa7 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineLimitTopN.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.xpack.esql.expression.Foldables; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.TopN; + +/** + * Combines a Limit immediately followed by a TopN into a single TopN. + * This is needed because {@link HoistRemoteEnrichTopN} can create new TopN nodes that are not covered by the previous rules. + */ +public final class CombineLimitTopN extends OptimizerRules.OptimizerRule { + + public CombineLimitTopN() { + super(OptimizerRules.TransformDirection.DOWN); + } + + @Override + public LogicalPlan rule(Limit limit) { + if (limit.child() instanceof TopN topn) { + int thisLimitValue = Foldables.limitValue(limit.limit(), limit.sourceText()); + int topNValue = Foldables.limitValue(topn.limit(), topn.sourceText()); + if (topNValue <= thisLimitValue) { + return topn; + } else { + return new TopN(topn.source(), topn.child(), topn.order(), limit.limit(), topn.local()); + } + } + if (limit.child() instanceof Project proj) { + // It is possible that Project is sitting on top on TopN. Swap limit and project then. + return proj.replaceChild(limit.replaceChild(proj.child())); + } + return limit; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java new file mode 100644 index 0000000000000..2b68b55c7d6c7 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; +import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; + +import java.util.Collections; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.Set; + +/** + * Locate any LIMIT that is "visible" under remote ENRICH, and make a copy of it above the ENRICH. The original limit is marked as local. + * This allows the correct semantics of the remote application of limit. Enrich itself does not change the cardinality, + * but the limit needs to be taken twice, locally on the node and again on the coordinator. + * This runs only on LogicalPlanOptimizer not on local one. + */ +public final class HoistRemoteEnrichLimit extends OptimizerRules.ParameterizedOptimizerRule + implements + OptimizerRules.CoordinatorOnly { + + public HoistRemoteEnrichLimit() { + super(OptimizerRules.TransformDirection.UP); + } + + @Override + protected LogicalPlan rule(Enrich en, LogicalOptimizerContext ctx) { + if (en.mode() == Enrich.Mode.REMOTE) { + // Since limits are combinable, we will just assemble the set of candidates and create one combined lowest limit above the + // Enrich. + Set seenLimits = Collections.newSetFromMap(new IdentityHashMap<>()); + en.child().forEachDownMayReturnEarly((p, stop) -> { + if (p instanceof Limit l && l.local() == false) { + seenLimits.add(l); + return; + } + if ((p instanceof CardinalityPreserving) == false // can change the number of rows, so we can't just pull a limit from + // under it + // this will fail the verifier anyway, so no need to continue + || (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.COORDINATOR) + // This is essentially another remote enrich - let it take care of its own limits + || (p instanceof Enrich e && e.mode() == Enrich.Mode.REMOTE) + // If it's a pipeline breaker, this part will be on the coordinator anyway, which likely will fail the verifier + // In any case, duplicating anything below there is pointless. + || p instanceof PipelineBreaker) { + stop.set(true); + } + }); + + if (seenLimits.isEmpty()) { + return en; + } + // Mark original limits as local + LogicalPlan transformLimits = en.transformDown(Limit.class, l -> seenLimits.contains(l) ? l.withLocal(true) : l); + // Shouldn't actually throw because we checked seenLimits is not empty + Limit lowestLimit = seenLimits.stream().min(Comparator.comparing(l -> (int) l.limit().fold(ctx.foldCtx()))).orElseThrow(); + // Insert new lowest limit on top of the Enrich + return new Limit(lowestLimit.source(), lowestLimit.limit(), transformLimits, true, false); + } + return en; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java new file mode 100644 index 0000000000000..dc57b0d53ae1e --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.OrderBy; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.TopN; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; + +import java.util.LinkedList; +import java.util.List; + +/** + * Locate any TopN that is "visible" under remote ENRICH, and make a copy of it above the ENRICH, + * while making a copy of the original fields. Mark the original TopN as local. + * This is the same idea as {@link HoistRemoteEnrichLimit} but for TopN instead of Limit. + * This must happen after {@link ReplaceLimitAndSortAsTopN}. + * This also has handling for the case where Enrich overrides the field(s) TopN is using. In this case, + * we need to create aliases for the fields used by TopN, and then use those aliases in the copy TopN. + * Then we need to add Project to remove the alias fields. Fortunately, PushDownUtils has the logic to do that. + */ +public final class HoistRemoteEnrichTopN extends OptimizerRules.OptimizerRule implements OptimizerRules.CoordinatorOnly { + public HoistRemoteEnrichTopN() { + super(OptimizerRules.TransformDirection.UP); + } + + @Override + protected LogicalPlan rule(Enrich en) { + if (en.mode() == Enrich.Mode.REMOTE) { + LogicalPlan plan = en.child(); + // This loop only takes care of one TopN, repeated application will stack them in correct order. + while (true) { + if (plan instanceof TopN top && top.local() == false) { + // Create a fake OrderBy and "push" Enrich through it to generate aliases + Enrich topWithEnrich = (Enrich) en.replaceChild(new OrderBy(top.source(), en.child(), top.order())); + LogicalPlan pushPlan = PushDownUtils.pushGeneratingPlanPastProjectAndOrderBy(topWithEnrich); + // If we needed to alias any names, the result would look like this: + // Project[[host{f}#14, timestamp{f}#16, user{f}#15, ip{r}#19, os{r}#20]] + // \_OrderBy[[Order[timestamp{f}#16,ASC,LAST], Order[user{f}#15,ASC,LAST], + // Order[$$ip$temp_name$21{r$}#22,ASC,LAST]]] + // \_Enrich[REMOTE,hosts[KEYWORD],ip{r}#3,{"match":{"indices":[],"match_field":"ip", + // "enrich_fields":["ip","os"]}},{},[ip{r}#19,os{r}#20]] + // \_Eval[[ip{r}#3 AS $$ip$temp_name$21#22]] + if (pushPlan instanceof Project proj) { + // We needed renaming - deconstruct the plan from above and extract the relevant parts + if ((proj.child() instanceof OrderBy o && o.child() instanceof Enrich e && e.child() instanceof Eval) == false) { + throw new IllegalStateException("Unexpected pushed plan structure: " + pushPlan); + } + OrderBy order = (OrderBy) proj.child(); + Enrich enrich = (Enrich) order.child(); + Eval eval = (Eval) enrich.child(); + // We insert the evals above the original TopN, so that the copy TopN works on the renamed fields + LogicalPlan replacementTop = eval.replaceChild(top.withLocal(true)); + LogicalPlan transformedEnrich = en.transformDown(p -> switch (p) { + case TopN t when t == top -> replacementTop; + // We only need to take care of Project because Drop can't drop our newly created fields + case Project pr -> { + List allFields = new LinkedList<>(pr.projections()); + allFields.addAll(eval.fields()); + yield pr.withProjections(allFields); + } + default -> p; + }); + + // Create the copied topN on top of the Enrich + var copyTop = new TopN(top.source(), transformedEnrich, order.order(), top.limit(), false); + // And use the project to remove the fields that we don't need anymore + return proj.replaceChild(copyTop); + } else { + // No need for aliasing - then it's simple, just copy the TopN on top and mark the original as local + LogicalPlan transformedEnrich = en.transformDown(TopN.class, t -> t == top ? top.withLocal(true) : t); + return new TopN(top.source(), transformedEnrich, top.order(), top.limit(), false); + } + } + if ((plan instanceof CardinalityPreserving) == false // can change the number of rows, so we can't just pull a TopN from + // under it + // this will fail the verifier anyway, so no need to continue + || (plan instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.COORDINATOR) + // This is essentially another remote Enrich, it can handle its own limits + || (plan instanceof Enrich e && e.mode() == Enrich.Mode.REMOTE) + || plan instanceof PipelineBreaker) { + break; + } + if (plan instanceof UnaryPlan u) { + plan = u.child(); + } else { + // The only non-unary plans right now are Join and Fork, and they are not cardinality preserving, + // so really there's nothing to do here. But if we had binary plan that is cardinality preserving, + // we would need to add it here. + break; + } + } + } + return en; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/OptimizerRules.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/OptimizerRules.java index a32bf3a720088..697da095c9460 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/OptimizerRules.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/OptimizerRules.java @@ -109,4 +109,9 @@ public final LogicalPlan apply(LogicalPlan plan, P context) { protected abstract LogicalPlan rule(SubPlan plan, P context); } + + /** + * Marker interface: This rule should only be applied on the coordinator plan, not for a local plan. + */ + public interface CoordinatorOnly {} } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java index 834f360f7ff05..496bdf8473f87 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -34,23 +35,23 @@ public PushDownAndCombineLimits() { @Override public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) { if (limit.child() instanceof Limit childLimit) { - var limitSource = limit.limit(); - var parentLimitValue = (int) limitSource.fold(ctx.foldCtx()); - var childLimitValue = (int) childLimit.limit().fold(ctx.foldCtx()); - // We want to preserve the duplicated() value of the smaller limit, so we'll use replaceChild. - return parentLimitValue < childLimitValue ? limit.replaceChild(childLimit.child()) : childLimit; + return combineLimits(limit, childLimit, ctx.foldCtx()); } else if (limit.child() instanceof UnaryPlan unary) { - if (unary instanceof Eval - || unary instanceof Project - || unary instanceof RegexExtract - || unary instanceof Enrich - || unary instanceof InferencePlan) { + if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof InferencePlan) { + // Push the limit under unary return unary.replaceChild(limit.replaceChild(unary.child())); } else if (unary instanceof MvExpand) { // MV_EXPAND can increase the number of rows, so we cannot just push the limit down // (we also have to preserve the LIMIT afterwards) // To avoid repeating this infinitely, we have to set duplicated = true. - return duplicateLimitAsFirstGrandchild(limit); + return duplicateLimitAsFirstGrandchild(limit, false); + } else if (unary instanceof Enrich enrich) { + if (enrich.mode() == Enrich.Mode.REMOTE) { + return duplicateLimitAsFirstGrandchild(limit, true); + } else { + // We can push past local enrich because it does not increase the number of rows + return enrich.replaceChild(limit.replaceChild(enrich.child())); + } } // check if there's a 'visible' descendant limit lower than the current one // and if so, align the current limit since it adds no value @@ -71,11 +72,26 @@ public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) { // The InlineJoin is currently excluded, as its right-hand side uses as data source a StubRelation that points to the entire // left-hand side, so adding a limit in there would lead to the right-hand side work on incomplete data. // To avoid repeating this infinitely, we have to set duplicated = true. - return duplicateLimitAsFirstGrandchild(limit); + // We use withLocal = false because if we have a remote join it will be forced into the fragment by the mapper anyway, + // And the verifier checks that there are no non-synthetic limits before the join. + // TODO: However, this means that the non-remote join will be always forced on the coordinator. We may want to revisit this. + return duplicateLimitAsFirstGrandchild(limit, false); } return limit; } + private static Limit combineLimits(Limit upper, Limit lower, FoldContext ctx) { + // Keep the smallest limit + var upperLimitValue = (int) upper.limit().fold(ctx); + var lowerLimitValue = (int) lower.limit().fold(ctx); + // We want to preserve the duplicated() value of the smaller limit. + if (lowerLimitValue <= upperLimitValue) { + return lower.withLocal(lower.local()); + } else { + return new Limit(upper.source(), upper.limit(), lower.child(), upper.duplicated(), upper.local()); + } + } + /** * Checks the existence of another 'visible' Limit, that exists behind an operation that doesn't produce output more data than * its input (that is not a relation/source nor aggregation). @@ -104,14 +120,15 @@ private static Limit descendantLimit(UnaryPlan unary) { * Duplicate the limit past its child if it wasn't duplicated yet. The duplicate is placed on top of its leftmost grandchild. * Idempotent. (Sets {@link Limit#duplicated()} to {@code true} on the limit that remains at the top.) */ - private static Limit duplicateLimitAsFirstGrandchild(Limit limit) { + private static Limit duplicateLimitAsFirstGrandchild(Limit limit, boolean withLocal) { if (limit.duplicated()) { return limit; } List grandChildren = limit.child().children(); LogicalPlan firstGrandChild = grandChildren.getFirst(); - LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild); + // Use the local limit under the original node, so it won't break the pipeline + LogicalPlan newFirstGrandChild = (withLocal ? limit.withLocal(withLocal) : limit).replaceChild(firstGrandChild); List newGrandChildren = new ArrayList<>(); newGrandChildren.add(newFirstGrandChild); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java index dfc1a26ae980c..e06acb34b4bf0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java @@ -18,7 +18,7 @@ public final class ReplaceLimitAndSortAsTopN extends OptimizerRules.OptimizerRul protected LogicalPlan rule(Limit plan) { LogicalPlan p = plan; if (plan.child() instanceof OrderBy o) { - p = new TopN(o.source(), o.child(), o.order(), plan.limit()); + p = new TopN(o.source(), o.child(), o.order(), plan.limit(), false); } return p; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java index 9a6176fa5628d..5f26dd13de7e7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java @@ -42,8 +42,9 @@ * This rule is applied to both STATS' {@link Aggregate} and {@link InlineJoin} right-hand side {@link Aggregate} plans. * The logic is common for both, but the handling of the {@link InlineJoin} is slightly different when it comes to pruning * its right-hand side {@link Aggregate}. + * Skipped in local optimizer: once a fragment contains an Agg, this can no longer be pruned, which the rule can do */ -public class ReplaceStatsFilteredAggWithEval extends OptimizerRules.OptimizerRule { +public class ReplaceStatsFilteredAggWithEval extends OptimizerRules.OptimizerRule implements OptimizerRules.CoordinatorOnly { @Override protected LogicalPlan rule(LogicalPlan plan) { Aggregate aggregate; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java new file mode 100644 index 0000000000000..1067f3623a1ab --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +/** + * This interface marks a command which does not add or remove rows. + * This means these are equivalent: + * ``` + * ... | LIMIT X | MY_COMMAND + * ``` + * and + * ``` + * ... | MY_COMMAND | LIMIT X + * ``` + * It is not true, for example, for WHERE: + * ``` + * ... | LIMIT X | WHERE side="dark" + * ``` + * If the first X rows do not contain any "dark" rows, the result is empty, however if we switch: + * ``` + * ... | WHERE side="dark" | LIMIT X + * ``` + * And the dataset contains "dark" rows, we will get some results. + */ +public interface CardinalityPreserving {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java index c8668f58ab5c0..6412aa1890994 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Objects; -public class Drop extends UnaryPlan implements TelemetryAware, SortAgnostic { +public class Drop extends UnaryPlan implements TelemetryAware, CardinalityPreserving, SortAgnostic { private final List removals; public Drop(Source source, LogicalPlan child, List removals) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index d2d764f337d14..89d7c5ba81234 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -48,9 +48,10 @@ public class Enrich extends UnaryPlan implements GeneratingPlan, - PostOptimizationVerificationAware, + PostOptimizationVerificationAware.CoordinatorOnly, PostAnalysisVerificationAware, TelemetryAware, + CardinalityPreserving, SortAgnostic, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( @@ -71,12 +72,11 @@ public class Enrich extends UnaryPlan @Override public ExecuteLocation executesOn() { - if (mode == Mode.REMOTE) { - return ExecuteLocation.REMOTE; - } else if (mode == Mode.COORDINATOR) { - return ExecuteLocation.COORDINATOR; - } - return ExecuteLocation.ANY; + return switch (mode) { + case REMOTE -> ExecuteLocation.REMOTE; + case COORDINATOR -> ExecuteLocation.COORDINATOR; + default -> ExecuteLocation.ANY; + }; } public enum Mode { @@ -277,13 +277,21 @@ public int hashCode() { private void checkForPlansForbiddenBeforeRemoteEnrich(Failures failures) { Set fails = new HashSet<>(); - this.forEachUp(LogicalPlan.class, u -> { + this.forEachDown(LogicalPlan.class, u -> { if (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR) { - fails.add(u.source()); + failures.add( + fail(this, "ENRICH with remote policy can't be executed after [" + u.source().text() + "]" + u.source().source()) + ); } }); + } + + @Override + public void postAnalysisVerification(Failures failures) { + if (this.mode == Mode.REMOTE) { + checkMvExpandAfterLimit(failures); + } - fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))); } /** @@ -308,14 +316,6 @@ private void checkMvExpandAfterLimit(Failures failures) { } - @Override - public void postAnalysisVerification(Failures failures) { - if (this.mode == Mode.REMOTE) { - checkMvExpandAfterLimit(failures); - } - - } - @Override public void postOptimizationVerification(Failures failures) { if (this.mode == Mode.REMOTE) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java index 5c0aa35f13880..d9ac0e720dd1f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java @@ -36,7 +36,13 @@ import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Eval extends UnaryPlan implements GeneratingPlan, PostAnalysisVerificationAware, TelemetryAware, SortAgnostic { +public class Eval extends UnaryPlan + implements + GeneratingPlan, + PostAnalysisVerificationAware, + TelemetryAware, + CardinalityPreserving, + SortAgnostic { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Eval", Eval::new); private final List fields; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java index 78d342ca7e3ad..8601ef456bef9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java @@ -18,7 +18,7 @@ import java.util.List; import java.util.Objects; -public class Insist extends UnaryPlan implements SurrogateLogicalPlan { +public class Insist extends UnaryPlan implements SurrogateLogicalPlan, CardinalityPreserving { private final List insistedAttributes; private @Nullable List lazyOutput = null; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java index 268c6bbe17242..4428e5a4ebe34 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java @@ -15,7 +15,7 @@ import java.util.List; import java.util.Objects; -public class Keep extends Project implements TelemetryAware, SortAgnostic { +public class Keep extends Project implements TelemetryAware, CardinalityPreserving, SortAgnostic { public Keep(Source source, LogicalPlan child, List projections) { super(source, child, projections); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java index cbc4ee7da5be9..6d57f3010df73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java @@ -18,7 +18,7 @@ import java.io.IOException; import java.util.Objects; -public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker { +public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new); private final Expression limit; @@ -29,19 +29,25 @@ public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker * infinite loops from adding a duplicate of the limit past the child over and over again. */ private final transient boolean duplicated; + /** + * Local limit is not a pipeline breaker, and is applied only to the local node's data. + * It should always end up inside a fragment. + */ + private final transient boolean local; /** - * Default way to create a new instance. Do not use this to copy an existing instance, as this sets {@link Limit#duplicated} to - * {@code false}. + * Default way to create a new instance. Do not use this to copy an existing instance, as this sets {@link Limit#duplicated} + * and {@link Limit#local} to {@code false}. */ public Limit(Source source, Expression limit, LogicalPlan child) { - this(source, limit, child, false); + this(source, limit, child, false, false); } - public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated) { + public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated, boolean local) { super(source, child); this.limit = limit; this.duplicated = duplicated; + this.local = local; } /** @@ -52,6 +58,7 @@ private Limit(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(LogicalPlan.class), + false, false ); } @@ -75,12 +82,12 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Limit::new, limit, child(), duplicated); + return NodeInfo.create(this, Limit::new, limit, child(), duplicated, local); } @Override public Limit replaceChild(LogicalPlan newChild) { - return new Limit(source(), limit, newChild, duplicated); + return new Limit(source(), limit, newChild, duplicated, local); } public Expression limit() { @@ -88,15 +95,23 @@ public Expression limit() { } public Limit withLimit(Expression limit) { - return new Limit(source(), limit, child(), duplicated); + return new Limit(source(), limit, child(), duplicated, local); } public boolean duplicated() { return duplicated; } + public boolean local() { + return local; + } + public Limit withDuplicated(boolean duplicated) { - return new Limit(source(), limit, child(), duplicated); + return new Limit(source(), limit, child(), duplicated, local); + } + + public Limit withLocal(boolean newLocal) { + return new Limit(source(), limit, child(), duplicated, newLocal); } @Override @@ -106,7 +121,7 @@ public boolean expressionsResolved() { @Override public int hashCode() { - return Objects.hash(limit, child(), duplicated); + return Objects.hash(limit, child(), duplicated, local); } @Override @@ -120,6 +135,15 @@ public boolean equals(Object obj) { Limit other = (Limit) obj; - return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child()) && (duplicated == other.duplicated); + return Objects.equals(limit, other.limit) + && Objects.equals(child(), other.child()) + && (duplicated == other.duplicated) + && (local == other.local); + } + + @Override + public ExecuteLocation executesOn() { + // Global limit always needs to be on the coordinator + return local ? ExecuteLocation.ANY : ExecuteLocation.COORDINATOR; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java index 1b59dd9d4d6da..74a99a0212349 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java @@ -27,7 +27,7 @@ /** * A {@code Project} is a {@code Plan} with one child. In {@code FROM idx | KEEP x, y}, the {@code KEEP} statement is a Project. */ -public class Project extends UnaryPlan implements SortAgnostic { +public class Project extends UnaryPlan implements CardinalityPreserving, SortAgnostic { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Project", Project::new); private final List projections; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java index f111b5d03edb3..04720da6dca26 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java @@ -24,7 +24,12 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public abstract class RegexExtract extends UnaryPlan implements GeneratingPlan, PostAnalysisVerificationAware, SortAgnostic { +public abstract class RegexExtract extends UnaryPlan + implements + GeneratingPlan, + PostAnalysisVerificationAware, + CardinalityPreserving, + SortAgnostic { protected final Expression input; protected final List extractedFields; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java index ff3ec96b0caa2..9fe49b50eeee3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Objects; -public class Rename extends UnaryPlan implements TelemetryAware, SortAgnostic { +public class Rename extends UnaryPlan implements TelemetryAware, CardinalityPreserving, SortAgnostic { private final List renamings; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java index 063b209a13ca4..b365be5253380 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java @@ -21,16 +21,22 @@ import java.util.List; import java.util.Objects; -public class TopN extends UnaryPlan implements PipelineBreaker { +public class TopN extends UnaryPlan implements PipelineBreaker, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "TopN", TopN::new); private final List order; private final Expression limit; + /** + * Local topn is not a pipeline breaker, and is applied only to the local node's data. + * It should always end up inside a fragment. + */ + private final transient boolean local; - public TopN(Source source, LogicalPlan child, List order, Expression limit) { + public TopN(Source source, LogicalPlan child, List order, Expression limit, boolean local) { super(source, child); this.order = order; this.limit = limit; + this.local = local; } private TopN(StreamInput in) throws IOException { @@ -38,7 +44,8 @@ private TopN(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class), in.readCollectionAsList(Order::new), - in.readNamedWriteable(Expression.class) + in.readNamedWriteable(Expression.class), + false ); } @@ -62,12 +69,20 @@ public boolean expressionsResolved() { @Override protected NodeInfo info() { - return NodeInfo.create(this, TopN::new, child(), order, limit); + return NodeInfo.create(this, TopN::new, child(), order, limit, local); } @Override public TopN replaceChild(LogicalPlan newChild) { - return new TopN(source(), newChild, order, limit); + return new TopN(source(), newChild, order, limit, local); + } + + public TopN withLocal(boolean local) { + return new TopN(source(), child(), order, limit, local); + } + + public boolean local() { + return local; } public Expression limit() { @@ -80,15 +95,20 @@ public List order() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), order, limit); + return Objects.hash(super.hashCode(), order, limit, local); } @Override public boolean equals(Object obj) { if (super.equals(obj)) { var other = (TopN) obj; - return Objects.equals(order, other.order) && Objects.equals(limit, other.limit); + return Objects.equals(order, other.order) && Objects.equals(limit, other.limit) && local == other.local; } return false; } + + @Override + public ExecuteLocation executesOn() { + return local ? ExecuteLocation.ANY : ExecuteLocation.COORDINATOR; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java index 633ed74d8addb..9756c0b5a8176 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.plan.GeneratingPlan; +import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving; import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic; @@ -24,6 +25,7 @@ public abstract class InferencePlan> extends UnaryPlan implements + CardinalityPreserving, SortAgnostic, GeneratingPlan>, ExecutesOn.Coordinator { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java index 2313dbed96ff8..f2511da4b0309 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import java.io.IOException; import java.util.List; @@ -24,7 +25,7 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class EnrichExec extends UnaryExec implements EstimatesRowSize { +public class EnrichExec extends UnaryExec implements EstimatesRowSize, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, "EnrichExec", @@ -191,4 +192,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(super.hashCode(), mode, matchType, matchField, policyName, policyMatchField, concreteIndices, enrichFields); } + + @Override + public ExecuteLocation executesOn() { + return switch (mode) { + case REMOTE -> ExecuteLocation.REMOTE; + case COORDINATOR -> ExecuteLocation.COORDINATOR; + default -> ExecuteLocation.ANY; + }; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 49192983b30e6..4cf615e717b3b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -11,7 +11,6 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; @@ -29,7 +28,6 @@ import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; -import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; @@ -39,7 +37,6 @@ import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; -import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec; import java.util.List; @@ -86,54 +83,6 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) { private PhysicalPlan mapUnary(UnaryPlan unary) { PhysicalPlan mappedChild = map(unary.child()); - // - // TODO - this is hard to follow, causes bugs and needs reworking - // https://github.com/elastic/elasticsearch/issues/115897 - // - if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { - // When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely. - // We're only going to do it on the coordinator node. - // The way we're going to do it is as follows: - // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. - // 2. Put this Enrich under it, removing everything that was below it previously. - // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under - // FragmentExec. - // 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich. - // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). - Holder hasFragment = new Holder<>(false); - - // Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice; - // include the plan up until this ENRICH in the fragment. - var childTransformed = mappedChild.transformUp(f -> { - // Once we reached FragmentExec, we stuff our Enrich under it - if (f instanceof FragmentExec) { - hasFragment.set(true); - return new FragmentExec(enrich); - } - if (f instanceof EnrichExec enrichExec) { - // It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec - assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here"; - return enrichExec.child(); - } - if (f instanceof UnaryExec unaryExec) { - if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) { - return f; - } else { - return unaryExec.child(); - } - } - // Here we have the following possibilities: - // 1. LeafExec - should resolve to FragmentExec or we can ignore it - // 2. Join - must be remote, and thus will go inside FragmentExec - // 3. Fork/MergeExec - not currently allowed with remote enrich - return f; - }); - - if (hasFragment.get()) { - return childTransformed; - } - } - if (mappedChild instanceof FragmentExec) { // COORDINATOR enrich must not be included to the fragment as it has to be executed on the coordinating node if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { @@ -141,7 +90,9 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { return MapperUtils.mapUnary(unary, mappedChild); } // in case of a fragment, push to it any current streaming operator - if (unary instanceof PipelineBreaker == false) { + if (unary instanceof PipelineBreaker == false + || (unary instanceof Limit limit && limit.local()) + || (unary instanceof TopN topN && topN.local())) { return new FragmentExec(unary); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index 6f6c76efaf08e..32e8c92bde047 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -194,6 +194,15 @@ public static EnrichResolution defaultEnrichResolution() { "languages_idx", "mapping-languages.json" ); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages_remote", + "language_code", + "languages_idx", + "mapping-languages.json" + ); return enrichResolution; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java index 35c75d99ab925..156886e29be09 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils; @@ -19,6 +20,7 @@ import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.junit.BeforeClass; @@ -27,6 +29,7 @@ import java.util.Set; import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; @@ -35,6 +38,7 @@ import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultInferenceResolution; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD; +import static org.hamcrest.Matchers.containsString; public abstract class AbstractLogicalPlanOptimizerTests extends ESTestCase { protected static EsqlParser parser; @@ -76,6 +80,24 @@ public static void init() { logicalOptimizer = new LogicalPlanOptimizer(logicalOptimizerCtx); enrichResolution = new EnrichResolution(); AnalyzerTestUtils.loadEnrichPolicyResolution(enrichResolution, "languages_idx", "id", "languages_idx", "mapping-languages.json"); + AnalyzerTestUtils.loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages_remote", + "id", + "languages_idx", + "mapping-languages.json" + ); + AnalyzerTestUtils.loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.COORDINATOR, + MATCH_TYPE, + "languages_coordinator", + "id", + "languages_idx", + "mapping-languages.json" + ); // Most tests used data from the test index, so we load it here, and use it in the plan() function. mapping = loadMapping("mapping-basic.json"); @@ -241,4 +263,14 @@ protected LogicalPlan planSample(String query) { protected List filteredWarnings() { return withDefaultLimitWarning(super.filteredWarnings()); } + + protected void failPlan(String esql, Class exceptionClass, String reason) { + var e = expectThrows(exceptionClass, () -> plan(esql)); + assertThat(e.getMessage(), containsString(reason)); + } + + protected void failPlan(String esql, String reason) { + failPlan(esql, VerificationException.class, reason); + } + } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 6f1f060b466d2..de463082fcfd2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -1051,8 +1051,8 @@ public void testPushdownLimitsPastLeftJoin() { var optimizedPlan = rule.apply(limit, logicalOptimizerCtx); var expectedPlan = join instanceof InlineJoin - ? new Limit(limit.source(), limit.limit(), join, false) - : new Limit(limit.source(), limit.limit(), join.replaceChildren(limit.replaceChild(join.left()), join.right()), true); + ? new Limit(limit.source(), limit.limit(), join, false, false) + : new Limit(limit.source(), limit.limit(), join.replaceChildren(limit.replaceChild(join.left()), join.right()), true, false); assertEquals(expectedPlan, optimizedPlan); @@ -5601,7 +5601,7 @@ public void testPushShadowingGeneratingPlanPastProject() { List initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes(); LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan); - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); @@ -5656,7 +5656,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() { List initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes(); LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan); - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); @@ -5716,7 +5716,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() { // This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has // been propagated into the generating plan. - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 281bd9dca9e35..e8fc277b91065 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; +import org.elasticsearch.xpack.esql.expression.Foldables; import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; @@ -2916,12 +2917,12 @@ public void testVerifierOnMissingReferences() { public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception { // Do not assert serialization: // This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator. - var plan = physicalPlan(""" + var plan = physicalPlanNoSerializationCheck(""" FROM test | RENAME languages AS language_code | SORT language_code | LOOKUP JOIN languages_lookup ON language_code - """, testData, false); + """); var planWithInvalidJoinLeftSide = plan.transformUp(LookupJoinExec.class, join -> join.replaceChildren(join.right(), join.right())); @@ -7006,7 +7007,9 @@ public void testEnrichBeforeLimit() { as(partialLimit.child(), EsRelation.class); } { - var plan = physicalPlan(""" + // Do not assert serialization: + // This has local LIMIT which does not serialize to a local LIMIT. + var plan = physicalPlanNoSerializationCheck(""" FROM test | EVAL employee_id = to_str(emp_no) | ENRICH _remote:departments @@ -7014,11 +7017,15 @@ public void testEnrichBeforeLimit() { var finalLimit = as(plan, LimitExec.class); var exchange = as(finalLimit.child(), ExchangeExec.class); var fragment = as(exchange.child(), FragmentExec.class); - var enrich = as(fragment.fragment(), Enrich.class); + var enrichLimit = as(fragment.fragment(), Limit.class); + assertThat(Foldables.limitValue(enrichLimit.limit(), enrichLimit.sourceText()), equalTo(10)); + var enrich = as(enrichLimit.child(), Enrich.class); assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE)); assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2"))); var evalFragment = as(enrich.child(), Eval.class); var partialLimit = as(evalFragment.child(), Limit.class); + assertThat(Foldables.limitValue(partialLimit.limit(), partialLimit.sourceText()), equalTo(10)); + assertTrue(partialLimit.local()); as(partialLimit.child(), EsRelation.class); } } @@ -7061,7 +7068,9 @@ public void testLimitThenEnrich() { } public void testLimitThenEnrichRemote() { - var plan = physicalPlan(""" + // Do not assert serialization: + // This has local LIMIT which does not serialize to a local LIMIT. + var plan = physicalPlanNoSerializationCheck(""" FROM test | LIMIT 10 | EVAL employee_id = to_str(emp_no) @@ -7070,11 +7079,15 @@ public void testLimitThenEnrichRemote() { var finalLimit = as(plan, LimitExec.class); var exchange = as(finalLimit.child(), ExchangeExec.class); var fragment = as(exchange.child(), FragmentExec.class); - var enrich = as(fragment.fragment(), Enrich.class); + var enrichLimit = as(fragment.fragment(), Limit.class); + assertThat(Foldables.limitValue(enrichLimit.limit(), enrichLimit.sourceText()), equalTo(10)); + var enrich = as(enrichLimit.child(), Enrich.class); assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE)); assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2"))); var evalFragment = as(enrich.child(), Eval.class); var partialLimit = as(evalFragment.child(), Limit.class); + assertThat(Foldables.limitValue(enrichLimit.limit(), enrichLimit.sourceText()), equalTo(10)); + assertTrue(partialLimit.local()); as(partialLimit.child(), EsRelation.class); } @@ -7113,7 +7126,9 @@ public void testEnrichBeforeTopN() { as(eval.child(), EsRelation.class); } { - var plan = physicalPlan(""" + // Do not assert serialization: + // This has local LIMIT which does not serialize to a local LIMIT. + var plan = physicalPlanNoSerializationCheck(""" FROM test | EVAL employee_id = to_str(emp_no) | ENRICH _remote:departments @@ -7130,7 +7145,9 @@ public void testEnrichBeforeTopN() { as(eval.child(), EsRelation.class); } { - var plan = physicalPlan(""" + // Do not assert serialization: + // This has local LIMIT which does not serialize to a local LIMIT. + var plan = physicalPlanNoSerializationCheck(""" FROM test | EVAL employee_id = to_str(emp_no) | ENRICH _remote:departments @@ -7150,7 +7167,9 @@ public void testEnrichBeforeTopN() { public void testEnrichAfterTopN() { { - var plan = physicalPlan(""" + // Do not assert serialization: + // This has local LIMIT which does not serialize to a local LIMIT. + var plan = physicalPlanNoSerializationCheck(""" FROM test | SORT emp_no | LIMIT 10 @@ -7186,7 +7205,9 @@ public void testEnrichAfterTopN() { as(partialTopN.child(), EsRelation.class); } { - var plan = physicalPlan(""" + // Do not assert serialization: + // This has local LIMIT which does not serialize to a local LIMIT. + var plan = physicalPlanNoSerializationCheck(""" FROM test | SORT emp_no | LIMIT 10 @@ -7196,11 +7217,15 @@ public void testEnrichAfterTopN() { var topN = as(plan, TopNExec.class); var exchange = as(topN.child(), ExchangeExec.class); var fragment = as(exchange.child(), FragmentExec.class); - var enrich = as(fragment.fragment(), Enrich.class); + var dupTopN = as(fragment.fragment(), TopN.class); + assertThat(Foldables.limitValue(dupTopN.limit(), dupTopN.sourceText()), equalTo(10)); + var enrich = as(dupTopN.child(), Enrich.class); assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE)); assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2"))); var evalFragment = as(enrich.child(), Eval.class); var partialTopN = as(evalFragment.child(), TopN.class); + assertThat(Foldables.limitValue(partialTopN.limit(), partialTopN.sourceText()), equalTo(10)); + assertTrue(partialTopN.local()); as(partialTopN.child(), EsRelation.class); } } @@ -7811,7 +7836,7 @@ private void assertLookupJoinFieldNames( ) { // Do not assert serialization: // This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator. - var plan = physicalPlan(query, data, false); + var plan = physicalPlanOptimizer.optimize(physicalPlan(query, data, false)); var physicalOperations = physicalOperationsFromPhysicalPlan(plan, useDataNodePlan); @@ -8269,6 +8294,10 @@ private PhysicalPlan physicalPlan(String query, TestDataSource dataSource) { return physicalPlan(query, dataSource, true); } + private PhysicalPlan physicalPlanNoSerializationCheck(String query) { + return physicalPlan(query, testData, false); + } + private PhysicalPlan physicalPlan(String query, TestDataSource dataSource, boolean assertSerialization) { var logical = logicalOptimizer.optimize(dataSource.analyzer.analyze(parser.createStatement(query, config))); // System.out.println("Logical\n" + logical); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimitTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimitTests.java new file mode 100644 index 0000000000000..84a006920f736 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimitTests.java @@ -0,0 +1,231 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.xpack.esql.expression.Foldables; +import org.elasticsearch.xpack.esql.optimizer.AbstractLogicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.plan.logical.Dissect; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.Project; + +import java.util.Locale; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +public class HoistRemoteEnrichLimitTests extends AbstractLogicalPlanOptimizerTests { + + /** + *

    +     * Limit[10[INTEGER],true,false]
    +     * \_Enrich[REMOTE,languages_remote[KEYWORD],id{r}#4,{"match":{"indices":[],"match_field":"id",
    +     * "enrich_fields":["language_code","language_name"]}},{=languages_idx},[language_code{r}#20, language_name{r}#21]]
    +     *   \_Eval[[emp_no{f}#6 AS id#4]]
    +     *     \_Limit[10[INTEGER],false,true]
    +     *       \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ...]
    +     * 
    + */ + public void testLimitWithinRemoteEnrich() { + var plan = plan(randomFrom(""" + from test + | EVAL id = emp_no + | LIMIT 10 + | ENRICH _remote:languages_remote + """, """ + from test + | LIMIT 10 + | EVAL id = emp_no + | ENRICH _remote:languages_remote + """)); // it should be the same in any order + + var limit = as(plan, Limit.class); + assertTrue(limit.duplicated()); + assertFalse(limit.local()); + var enrich = as(limit.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var eval = as(enrich.child(), Eval.class); + var innerLimit = as(eval.child(), Limit.class); + assertFalse(innerLimit.duplicated()); + assertTrue(innerLimit.local()); + } + + /** + *
    +     * Limit[10[INTEGER],true,false]
    +     * \_Enrich[REMOTE,languages_remote[KEYWORD],id{r}#4,{"match":{"indices":[],"match_field":"id","enrich_fields":["language_cod
    +     * e","language_name"]}},{=languages_idx},[language_code{r}#20, language_name{r}#21]]
    +     *   \_Eval[[emp_no{f}#6 AS id#4]]
    +     *     \_Limit[10[INTEGER],false,true]
    +     *       \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
    +     * 
    + */ + public void testLimitWithinRemoteEnrichAndAfter() { + var plan = plan(""" + from test + | LIMIT 20 + | EVAL id = emp_no + | ENRICH _remote:languages_remote + | LIMIT 10 + """); // it should be the same in any order + + var limit = as(plan, Limit.class); + assertTrue(limit.duplicated()); + assertFalse(limit.local()); + var enrich = as(limit.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var eval = as(enrich.child(), Eval.class); + var innerLimit = as(eval.child(), Limit.class); + assertFalse(innerLimit.duplicated()); + assertTrue(innerLimit.local()); + } + + // Same as above but limits are reversed + public void testLimitWithinRemoteEnrichAndAfterHigher() { + var plan = plan(""" + from test + | LIMIT 10 + | EVAL id = emp_no + | ENRICH _remote:languages_remote + | LIMIT 20 + """); // it should be the same in any order + + var limit = as(plan, Limit.class); + assertTrue(limit.duplicated()); + assertFalse(limit.local()); + var enrich = as(limit.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var eval = as(enrich.child(), Eval.class); + var innerLimit = as(eval.child(), Limit.class); + assertFalse(innerLimit.duplicated()); + assertTrue(innerLimit.local()); + } + + /** + *
    +     * Project[[salary{f}#19 AS wage#10, emp_no{f}#14 AS id#4, first_name{r}#11, language_code{r}#28, language_name{r}#29]]
    +     * \_Limit[5[INTEGER],true,false]
    +     *   \_Enrich[REMOTE,languages_remote[KEYWORD],emp_no{f}#14,{"match":{"indices":[],"match_field":"id","enrich_fields":["languag
    +     * e_code","language_name"]}},{=languages_idx},[language_code{r}#28, language_name{r}#29]]
    +     *     \_Dissect[first_name{f}#15,Parser[pattern=%{first_name}s, appendSeparator=, parser=org.elasticsearch.dissect.DissectParse
    +     * r@7d5c7931],[first_name{r}#11]]
    +     *       \_Limit[5[INTEGER],false,true]
    +     *         \_EsRelation[test][_meta_field{f}#20, emp_no{f}#14, first_name{f}#15, ..]
    +     * 
    + */ + public void testManyLimitsWithinRemoteEnrich() { + var plan = plan(""" + from test + | LIMIT 10 + | EVAL id = emp_no + | KEEP first_name, salary, id + | RENAME salary AS wage + | DISSECT first_name "%{first_name}s" + | LIMIT 5 + | ENRICH _remote:languages_remote + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + assertTrue(limit.duplicated()); + assertFalse(limit.local()); + assertThat(Foldables.limitValue(limit.limit(), limit.sourceText()), equalTo(5)); + var enrich = as(limit.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var dissect = as(enrich.child(), Dissect.class); + var innerLimit = as(dissect.child(), Limit.class); + assertFalse(innerLimit.duplicated()); + assertTrue(innerLimit.local()); + assertThat(Foldables.limitValue(innerLimit.limit(), innerLimit.sourceText()), equalTo(5)); + } + + /** + * Project[[first_name{f}#14, salary{f}#18 AS wage#11, emp_no{f}#13 AS id#4, language_code{r}#32, language_name{r}#33]] + * \_Limit[5[INTEGER],true,false] + * \_Enrich[REMOTE,languages_remote[KEYWORD],emp_no{f}#13,{"match":{"indices":[],"match_field":"id","enrich_fields":["languag + * e_code","language_name"]}},{=languages_idx},[language_code{r}#32, language_name{r}#33]] + * \_Limit[5[INTEGER],true,true] + * \_Enrich[REMOTE,languages_remote[KEYWORD],emp_no{f}#13,{"match":{"indices":[],"match_field":"id","enrich_fields":["languag + * e_code","language_name"]}},{=languages_idx},[language_code{r}#27, language_name{r}#28]] + * \_Limit[5[INTEGER],false,true] + * \_EsRelation[test][_meta_field{f}#19, emp_no{f}#13, first_name{f}#14, ..] + */ + public void testLimitsWithinRemoteEnrichTwice() { + var plan = plan(""" + from test + | LIMIT 10 + | EVAL id = emp_no + | KEEP first_name, salary, id + | ENRICH _remote:languages_remote + | RENAME salary AS wage + | LIMIT 5 + | ENRICH _remote:languages_remote + """); + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + assertTrue(limit.duplicated()); + assertFalse(limit.local()); + assertThat(Foldables.limitValue(limit.limit(), limit.sourceText()), equalTo(5)); + var enrich = as(limit.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var innerLimit = as(enrich.child(), Limit.class); + assertTrue(innerLimit.duplicated()); + assertTrue(innerLimit.local()); + assertThat(Foldables.limitValue(innerLimit.limit(), innerLimit.sourceText()), equalTo(5)); + var secondEnrich = as(innerLimit.child(), Enrich.class); + assertThat(secondEnrich.mode(), is(Enrich.Mode.REMOTE)); + var innermostLimit = as(secondEnrich.child(), Limit.class); + assertFalse(innermostLimit.duplicated()); + assertTrue(innermostLimit.local()); + assertThat(Foldables.limitValue(innermostLimit.limit(), innermostLimit.sourceText()), equalTo(5)); + } + + // These cases do not get hoisting, and it's ok + public void testLimitWithinOtherEnrich() { + String enrichPolicy = randomFrom("languages_idx", "_any:languages_idx", "_coordinator:languages_coordinator"); + var plan = plan(String.format(Locale.ROOT, """ + from test + | EVAL id = emp_no + | LIMIT 10 + | ENRICH %s + """, enrichPolicy)); + // Here ENRICH is on top - no hoisting happens + var enrich = as(plan, Enrich.class); + assertThat(enrich.mode(), not(is(Enrich.Mode.REMOTE))); + } + + // Non-cardinality preserving commands after limit + public void testFilterLimitThenEnrich() { + // Hoisting does not happen, so the verifier fails since LIMIT is before remote ENRICH + failPlan(""" + from test + | EVAL id = emp_no + | LIMIT 10 + | WHERE first_name != "john" + | ENRICH _remote:languages_remote + """, "ENRICH with remote policy can't be executed after [LIMIT 10]"); + } + + public void testMvExpandLimitThenEnrich() { + // Hoisting does not happen, so the verifier fails since LIMIT is before remote ENRICH + failPlan(""" + from test + | EVAL id = emp_no + | LIMIT 10 + | MV_EXPAND languages + | ENRICH _remote:languages_remote + """, "MV_EXPAND after LIMIT is incompatible with remote ENRICH"); + } + + // Other cases where hoisting does not happen: + // - ExecutesOn.COORDINATOR - this fails the verifier + // - PipelineBreaker - all relevant ones are also ExecutesOn.COORDINATOR +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopNTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopNTests.java new file mode 100644 index 0000000000000..ab79d90fb817e --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopNTests.java @@ -0,0 +1,172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; +import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.optimizer.AbstractLogicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.TopN; + +import java.util.Map; + +import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class HoistRemoteEnrichTopNTests extends AbstractLogicalPlanOptimizerTests { + + /** + *
    +     * TopN[[Order[emp_no{f}#7,ASC,LAST]],10[INTEGER],false]
    +     * \_Enrich[REMOTE,languages_remote[KEYWORD],id{r}#4,{"match":{"indices":[],"match_field":"id","enrich_fields":["language_cod
    +     * e","language_name"]}},{=languages_idx},[language_code{r}#21, language_name{r}#22]]
    +     *   \_TopN[[Order[emp_no{f}#7,ASC,LAST]],10[INTEGER],true]
    +     *     \_Eval[[emp_no{f}#7 AS id#4]]
    +     *       \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
    +     * 
    + */ + public void testLimitWithinRemoteEnrich() { + var plan = plan(""" + from test + | EVAL id = emp_no + | SORT emp_no + | LIMIT 10 + | ENRICH _remote:languages_remote + """); + + var topn = as(plan, TopN.class); + assertFalse(topn.local()); + var enrich = as(topn.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var innerTopN = as(enrich.child(), TopN.class); + assertTrue(innerTopN.local()); + as(innerTopN.child(), Eval.class); + } + + /** + * Test case for aliasing within TopN + Enrich. This happens when Enrich had a field that overrides an existing field, + * so we need to alias it. + *
    +     *     Project[[host.name{f}#10, host.os{f}#11, host.version{f}#12, host.name{f}#10 AS host#5, host_group{r}#21, description{
    +     * r}#22, card{r}#23, ip0{r}#24, ip1{r}#25]]
    +     * \_Project[[host.name{f}#10, host.os{f}#11, host.version{f}#12, host_group{r}#21, description{r}#22, card{r}#23, ip0{r}#2
    +     * 4, ip1{r}#25]]
    +     *   \_TopN[[Order[$$description$temp_name$27{r$}#28,ASC,LAST]],10[INTEGER],false]
    +     *     \_Enrich[REMOTE,hosts[KEYWORD],host.name{f}#10,{"match":{"indices":[],"match_field":"host","enrich_fields":["host_group","
    +     * description","card","ip0","ip1"]}},{=hosts},[host_group{r}#21, description{r}#22, card{r}#23, ip0{r}#24, ip1{r}#
    +     * 25]]
    +     *       \_Eval[[description{f}#13 AS $$description$temp_name$27#28]]
    +     *         \_TopN[[Order[description{f}#13,ASC,LAST]],10[INTEGER],true]
    +     *           \_EsRelation[host_inventory][description{f}#13, host.name{f}#10, host.os{f}#11, ..]
    +     * 
    + * TODO: probably makes sense to remove double project, but this can be done later + */ + public void testTopNWithinRemoteEnrichAliasing() { + // Set up index and enrich policy with overlapping fields + var enrichResolution = new EnrichResolution(); + AnalyzerTestUtils.loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "hosts", + "host", + "hosts", + "mapping-hosts.json" + ); + var mapping = loadMapping("mapping-host_inventory.json"); + EsIndex airports = new EsIndex("host_inventory", mapping, Map.of("host_inventory", IndexMode.STANDARD)); + IndexResolution resolution = IndexResolution.valid(airports); + var analyzer = new Analyzer( + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + resolution, + defaultLookupResolution(), + enrichResolution, + emptyInferenceResolution() + ), + TEST_VERIFIER + ); + + var query = (""" + from host_inventory + | SORT description + | LIMIT 10 + | EVAL host = host.name + | KEEP host*, description + | ENRICH _remote:hosts + """); + var analyzed = analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)); + var plan = logicalOptimizer.optimize(analyzed); + + var proj1 = as(plan, Project.class); + var proj2 = as(proj1.child(), Project.class); + var topn = as(proj2.child(), TopN.class); + assertFalse(topn.local()); + var enrich = as(topn.child(), Enrich.class); + assertThat(enrich.mode(), is(Enrich.Mode.REMOTE)); + var eval = as(enrich.child(), Eval.class); + var evalAlias = as(eval.expressions().get(0), Alias.class); + var evalName = as(evalAlias.child(), NamedExpression.class); + assertThat(evalName.name(), equalTo("description")); + var innerTopN = as(eval.child(), TopN.class); + assertTrue(innerTopN.local()); + } + + public void testFilterLimitThenEnrich() { + // Hoisting does not happen, so the verifier fails since TopN is before remote ENRICH + failPlan(""" + from test + | EVAL id = emp_no + | SORT emp_no + | LIMIT 10 + | WHERE first_name != "john" + | ENRICH _remote:languages_remote + """, "ENRICH with remote policy can't be executed after [SORT emp_no]"); + } + + public void testMvExpandLimitThenEnrich() { + failPlan(""" + from test + | EVAL id = emp_no + | SORT emp_no + | LIMIT 10 + | MV_EXPAND languages + | ENRICH _remote:languages_remote + """, "MV_EXPAND after LIMIT is incompatible with remote ENRICH"); + } + + public void testTwoSortsWithinRemoteEnrich() { + failPlan(""" + from test + | EVAL id = emp_no + | SORT emp_no + | LIMIT 10 + | SORT id + | LIMIT 5 + | ENRICH _remote:languages_remote + """, "ENRICH with remote policy can't be executed after [SORT emp_no]"); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java index b1626e4b77ce8..449cdb0345bfa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java @@ -16,17 +16,23 @@ import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToInteger; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import java.util.List; +import java.util.Map; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -42,7 +48,7 @@ public class PushDownAndCombineLimitsTests extends ESTestCase { - private static class PushDownLimitTestCase { + private static class PushDownLimitTestCase { private final Class clazz; private final BiFunction planBuilder; private final BiConsumer planChecker; @@ -102,6 +108,25 @@ public void checkOptimizedPlan(LogicalPlan basePlan, LogicalPlan optimizedPlan) assertEquals(basePlan.rerankFields(), optimizedPlan.rerankFields()); assertEquals(basePlan.scoreAttribute(), optimizedPlan.scoreAttribute()); } + ), + new PushDownLimitTestCase<>( + Enrich.class, + (plan, attr) -> new Enrich( + EMPTY, + plan, + randomFrom(Enrich.Mode.ANY, Enrich.Mode.COORDINATOR), + randomLiteral(KEYWORD), + attr, + null, + Map.of(), + List.of() + ), + (basePlan, optimizedPlan) -> { + assertEquals(basePlan.source(), optimizedPlan.source()); + assertEquals(basePlan.mode(), optimizedPlan.mode()); + assertEquals(basePlan.policyName(), optimizedPlan.policyName()); + assertEquals(basePlan.matchField(), optimizedPlan.matchField()); + } ) ); @@ -174,6 +199,58 @@ public void testNonPushableLimit() { } } + private static final List> DUPLICATING_TEST_CASES = List.of( + new PushDownLimitTestCase<>( + Enrich.class, + (plan, attr) -> new Enrich(EMPTY, plan, Enrich.Mode.REMOTE, randomLiteral(KEYWORD), attr, null, Map.of(), List.of()), + (basePlan, optimizedPlan) -> { + assertEquals(basePlan.source(), optimizedPlan.source()); + assertEquals(basePlan.mode(), optimizedPlan.mode()); + assertEquals(basePlan.policyName(), optimizedPlan.policyName()); + assertEquals(basePlan.matchField(), optimizedPlan.matchField()); + var limit = as(optimizedPlan.child(), Limit.class); + assertTrue(limit.local()); + assertFalse(limit.duplicated()); + } + ), + new PushDownLimitTestCase<>(MvExpand.class, (plan, attr) -> new MvExpand(EMPTY, plan, attr, attr), (basePlan, optimizedPlan) -> { + assertEquals(basePlan.source(), optimizedPlan.source()); + assertEquals(basePlan.expanded(), optimizedPlan.expanded()); + var limit = as(optimizedPlan.child(), Limit.class); + assertFalse(limit.local()); + assertFalse(limit.duplicated()); + }), + new PushDownLimitTestCase<>( + Join.class, + (plan, attr) -> new Join(EMPTY, plan, plan, new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), attr)), + (basePlan, optimizedPlan) -> { + assertEquals(basePlan.source(), optimizedPlan.source()); + var limit = as(optimizedPlan.left(), Limit.class); + assertFalse(limit.local()); + assertFalse(limit.duplicated()); + } + ) + + ); + + public void testPushableLimitDuplicate() { + FieldAttribute a = getFieldAttribute("a"); + FieldAttribute b = getFieldAttribute("b"); + EsRelation relation = relation().withAttributes(List.of(a, b)); + + for (PushDownLimitTestCase duplicatingTestCase : DUPLICATING_TEST_CASES) { + int precedingLimitValue = randomIntBetween(1, 10_000); + Limit precedingLimit = new Limit(EMPTY, new Literal(EMPTY, precedingLimitValue, INTEGER), relation); + LogicalPlan duplicatingLimitTestPlan = duplicatingTestCase.buildPlan(precedingLimit, a); + int upperLimitValue = randomIntBetween(1, precedingLimitValue); + Limit upperLimit = new Limit(EMPTY, new Literal(EMPTY, upperLimitValue, INTEGER), duplicatingLimitTestPlan); + Limit optimizedPlan = as(optimizePlan(upperLimit), Limit.class); + duplicatingTestCase.checkOptimizedPlan(duplicatingLimitTestPlan, optimizedPlan.child()); + assertTrue(optimizedPlan.duplicated()); + assertFalse(optimizedPlan.local()); + } + } + private LogicalPlan optimizePlan(LogicalPlan plan) { return new PushDownAndCombineLimits().apply(plan, unboundLogicalOptimizerContext()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/LimitSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/LimitSerializationTests.java index b1ffb9c5f8ba8..d762416db64d8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/LimitSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/LimitSerializationTests.java @@ -20,7 +20,7 @@ protected Limit createTestInstance() { Source source = randomSource(); Expression limit = FieldAttributeTests.createFieldAttribute(0, false); LogicalPlan child = randomChild(0); - return new Limit(source, limit, child, randomBoolean()); + return new Limit(source, limit, child, randomBoolean(), randomBoolean()); } @Override @@ -28,13 +28,15 @@ protected Limit mutateInstance(Limit instance) throws IOException { Expression limit = instance.limit(); LogicalPlan child = instance.child(); boolean duplicated = instance.duplicated(); - switch (randomIntBetween(0, 2)) { + boolean local = instance.local(); + switch (randomIntBetween(0, 3)) { case 0 -> limit = randomValueOtherThan(limit, () -> FieldAttributeTests.createFieldAttribute(0, false)); case 1 -> child = randomValueOtherThan(child, () -> randomChild(0)); case 2 -> duplicated = duplicated == false; + case 3 -> local = local == false; default -> throw new IllegalStateException("Should never reach here"); } - return new Limit(instance.source(), limit, child, duplicated); + return new Limit(instance.source(), limit, child, duplicated, local); } @Override @@ -45,8 +47,9 @@ protected boolean alwaysEmptySource() { @Override protected Limit copyInstance(Limit instance, TransportVersion version) throws IOException { // Limit#duplicated() is ALWAYS false when being serialized and we assert that in Limit#writeTo(). + // The same applies to Limit#local. // So, we need to manually simulate this situation. - Limit deserializedCopy = super.copyInstance(instance.withDuplicated(false), version); - return deserializedCopy.withDuplicated(instance.duplicated()); + Limit deserializedCopy = super.copyInstance(instance, version); + return deserializedCopy.withDuplicated(instance.duplicated()).withLocal(instance.local()); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TopNSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TopNSerializationTests.java index 6c3863582f215..ff620e79c57fa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TopNSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TopNSerializationTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plan.logical; +import org.elasticsearch.TransportVersion; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests; @@ -22,7 +23,7 @@ public static TopN randomTopN(int depth) { LogicalPlan child = randomChild(depth); List order = randomOrders(); Expression limit = AbstractExpressionSerializationTests.randomChild(); - return new TopN(source, child, order, limit); + return new TopN(source, child, order, limit, randomBoolean()); } private static List randomOrders() { @@ -40,16 +41,26 @@ protected TopN mutateInstance(TopN instance) throws IOException { LogicalPlan child = instance.child(); List order = instance.order(); Expression limit = instance.limit(); - switch (between(0, 2)) { + boolean local = instance.local(); + switch (between(0, 3)) { case 0 -> child = randomValueOtherThan(child, () -> randomChild(0)); case 1 -> order = randomValueOtherThan(order, TopNSerializationTests::randomOrders); case 2 -> limit = randomValueOtherThan(limit, AbstractExpressionSerializationTests::randomChild); + case 3 -> local = local == false; } - return new TopN(source, child, order, limit); + return new TopN(source, child, order, limit, local); } @Override protected boolean alwaysEmptySource() { return true; } + + @Override + protected TopN copyInstance(TopN instance, TransportVersion version) throws IOException { + // TopN#local is ALWAYS false after serialization. + TopN deserializedCopy = super.copyInstance(instance, version); + return deserializedCopy.withLocal(instance.local()); + } + }