From d77756169fa74f6d0711bcd258f05dc6a40db92e Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 4 Sep 2025 13:56:21 -0600 Subject: [PATCH 1/6] Add check that remote enrich stays on remote side --- .../xpack/esql/plan/physical/EnrichExec.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java index 22ee88f8119ec..8f1a33cad3337 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java @@ -13,6 +13,9 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.IndexMode; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware; +import org.elasticsearch.xpack.esql.common.Failure; +import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; @@ -30,7 +33,7 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class EnrichExec extends UnaryExec implements EstimatesRowSize { +public class EnrichExec extends UnaryExec implements EstimatesRowSize, PostPhysicalOptimizationVerificationAware { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, "EnrichExec", @@ -220,4 +223,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(super.hashCode(), mode, matchType, matchField, policyName, policyMatchField, concreteIndices, enrichFields); } + + @Override + public void postPhysicalOptimizationVerification(Failures failures) { + if (mode == Enrich.Mode.REMOTE) { + // check that there is no FragmentedExec in the child plan - that would mean we're on the wrong side of the remote boundary + child().forEachDown(FragmentExec.class, f -> { + failures.add(Failure.fail(this, "Remote enrich cannot be performed on the coordinator side")); + }); + } + } } From b6244e5d075ab941427a9ba74d3edc676a77fd96 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 8 Sep 2025 18:28:21 -0600 Subject: [PATCH 2/6] Make it more generic --- .../xpack/esql/optimizer/PhysicalVerifier.java | 6 ++++++ .../xpack/esql/plan/physical/EnrichExec.java | 14 +++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java index 781a8f5263c1f..224b63390b771 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; @@ -54,6 +55,11 @@ void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failure ); } } + + if (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) { + failures.add(fail(p, "Physical plan contains remote executing operation [{}] in local part", p.nodeName())); + } + PlanConsistencyChecker.checkPlan(p, depFailures); if (failures.hasFailures() == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java index 8f1a33cad3337..df0faf539e57b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.IndexMode; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware; import org.elasticsearch.xpack.esql.common.Failure; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; @@ -24,6 +23,7 @@ import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import java.io.IOException; import java.util.List; @@ -33,7 +33,7 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class EnrichExec extends UnaryExec implements EstimatesRowSize, PostPhysicalOptimizationVerificationAware { +public class EnrichExec extends UnaryExec implements EstimatesRowSize, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, "EnrichExec", @@ -225,12 +225,12 @@ public int hashCode() { } @Override - public void postPhysicalOptimizationVerification(Failures failures) { + public ExecuteLocation executesOn() { if (mode == Enrich.Mode.REMOTE) { - // check that there is no FragmentedExec in the child plan - that would mean we're on the wrong side of the remote boundary - child().forEachDown(FragmentExec.class, f -> { - failures.add(Failure.fail(this, "Remote enrich cannot be performed on the coordinator side")); - }); + return ExecuteLocation.REMOTE; + } else if (mode == Enrich.Mode.COORDINATOR) { + return ExecuteLocation.COORDINATOR; } + return ExecuteLocation.ANY; } } From a787ae845525bf1e095469b1357107ae7fdce885 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 9 Sep 2025 00:35:05 +0000 Subject: [PATCH 3/6] [CI] Auto commit changes from spotless --- .../org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java index df0faf539e57b..dd1c094a39050 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java @@ -13,8 +13,6 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.IndexMode; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.xpack.esql.common.Failure; -import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; From 6d638be7e8284727fe863f00123e851671ffd02a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 9 Sep 2025 12:56:41 -0600 Subject: [PATCH 4/6] Verifier refactor --- .../optimizer/LocalLogicalPlanOptimizer.java | 4 +-- .../optimizer/LocalPhysicalPlanOptimizer.java | 4 +-- .../esql/optimizer/LogicalPlanOptimizer.java | 4 +-- .../xpack/esql/optimizer/LogicalVerifier.java | 24 ++++++++-------- .../esql/optimizer/PhysicalPlanOptimizer.java | 4 +-- .../esql/optimizer/PhysicalVerifier.java | 28 +++++++++++-------- .../PostOptimizationPhasePlanVerifier.java | 16 +++++++++-- .../xpack/esql/plan/logical/Enrich.java | 11 ++++---- .../xpack/esql/plan/physical/EnrichExec.java | 11 ++++---- .../optimizer/LogicalPlanOptimizerTests.java | 6 ++-- 10 files changed, 63 insertions(+), 49 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index 088b1ad15557f..11aeb610f2ef8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -39,7 +39,7 @@ */ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor { - private final LogicalVerifier verifier = LogicalVerifier.INSTANCE; + private final LogicalVerifier verifier = LogicalVerifier.getLocalVerifier(); private static final List> RULES = arrayAsArrayList( new Batch<>( @@ -90,7 +90,7 @@ private static Batch localOperators() { public LogicalPlan localOptimize(LogicalPlan plan) { LogicalPlan optimized = execute(plan); - Failures failures = verifier.verify(optimized, true, plan.output()); + Failures failures = verifier.verify(optimized, plan.output()); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index 15cc1d54eb3a6..417135c588853 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -37,7 +37,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor> RULES = rules(true); - private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; + private final PhysicalVerifier verifier = PhysicalVerifier.getLocalVerifier(); public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) { super(context); @@ -48,7 +48,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) { } PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { - Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes); + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index 6f550524c5ca5..a6c4fa173391a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -105,7 +105,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor("Set as Optimized", Limiter.ONCE, new SetAsOptimized()) ); - private final LogicalVerifier verifier = LogicalVerifier.INSTANCE; + private final LogicalVerifier verifier = LogicalVerifier.getGeneralVerifier(); public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) { super(optimizerContext); @@ -114,7 +114,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) { public LogicalPlan optimize(LogicalPlan verified) { var optimized = execute(verified); - Failures failures = verifier.verify(optimized, false, verified.output()); + Failures failures = verifier.verify(optimized, verified.output()); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java index 4a04b46be295a..087e6817379d1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java @@ -15,20 +15,22 @@ public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier { - public static final LogicalVerifier INSTANCE = new LogicalVerifier(); + private LogicalVerifier(boolean isLocal) { + super(isLocal); + } - private LogicalVerifier() {} + public static LogicalVerifier getLocalVerifier() { + return new LogicalVerifier(true); + } + + public static LogicalVerifier getGeneralVerifier() { + return new LogicalVerifier(false); + } @Override - boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) { - if (skipRemoteEnrichVerification) { - // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 - var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance); - if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { - return true; - } - } - return false; + boolean hasRemoteEnrich(LogicalPlan optimizedPlan) { + var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance); + return enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java index 6d60c547f47d6..a8101165e7a5e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java @@ -28,7 +28,7 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()) ); - private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; + private final PhysicalVerifier verifier = PhysicalVerifier.getGeneralVerifier(); public PhysicalPlanOptimizer(PhysicalOptimizerContext context) { super(context); @@ -39,7 +39,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) { } PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { - Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes); + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java index 224b63390b771..f4116a3baa7d7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java @@ -23,20 +23,23 @@ /** Physical plan verifier. */ public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier { - public static final PhysicalVerifier INSTANCE = new PhysicalVerifier(); + private PhysicalVerifier(boolean isLocal) { + super(isLocal); + } - private PhysicalVerifier() {} + public static PhysicalVerifier getLocalVerifier() { + return new PhysicalVerifier(true); + } + + public static PhysicalVerifier getGeneralVerifier() { + return new PhysicalVerifier(false); + } @Override - boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) { - if (skipRemoteEnrichVerification) { - // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 - var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); - if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { - return true; - } - } - return false; + boolean hasRemoteEnrich(PhysicalPlan optimizedPlan) { + // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 + var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); + return enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE; } @Override @@ -56,7 +59,8 @@ void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failure } } - if (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) { + // This check applies only for general physical plans (isLocal == false) + if (isLocal == false && p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) { failures.add(fail(p, "Physical plan contains remote executing operation [{}] in local part", p.nodeName())); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java index e187264b2432f..39c8c75d5f6b3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java @@ -28,11 +28,20 @@ */ public abstract class PostOptimizationPhasePlanVerifier

> { + // Are we verifying the global plan (coordinator) or a local plan (data node)? + protected final boolean isLocal; + + protected PostOptimizationPhasePlanVerifier(boolean isLocal) { + this.isLocal = isLocal; + } + /** Verifies the optimized plan */ - public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List expectedOutputAttributes) { + public Failures verify(P optimizedPlan, List expectedOutputAttributes) { Failures failures = new Failures(); Failures depFailures = new Failures(); - if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) { + // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 + // This is a temporary workaround to skip verification when there is a remote enrich, due to a bug + if (isLocal && hasRemoteEnrich(optimizedPlan)) { return failures; } @@ -47,7 +56,8 @@ public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, Li return failures; } - abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification); + // This is a temporary workaround to skip verification when there is a remote enrich, due to a bug + abstract boolean hasRemoteEnrich(P optimizedPlan); abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index ef268ff6b7964..6015cb9f42bf6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -74,12 +74,11 @@ public class Enrich extends UnaryPlan @Override public ExecuteLocation executesOn() { - if (mode == Mode.REMOTE) { - return ExecuteLocation.REMOTE; - } else if (mode == Mode.COORDINATOR) { - return ExecuteLocation.COORDINATOR; - } - return ExecuteLocation.ANY; + return switch (mode) { + case REMOTE -> ExecuteLocation.REMOTE; + case COORDINATOR -> ExecuteLocation.COORDINATOR; + default -> ExecuteLocation.ANY; + }; } public enum Mode { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java index dd1c094a39050..28d698a6fec64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java @@ -224,11 +224,10 @@ public int hashCode() { @Override public ExecuteLocation executesOn() { - if (mode == Enrich.Mode.REMOTE) { - return ExecuteLocation.REMOTE; - } else if (mode == Enrich.Mode.COORDINATOR) { - return ExecuteLocation.COORDINATOR; - } - return ExecuteLocation.ANY; + return switch (mode) { + case REMOTE -> ExecuteLocation.REMOTE; + case COORDINATOR -> ExecuteLocation.COORDINATOR; + default -> ExecuteLocation.ANY; + }; } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 82119fb7baa82..f5582538e8504 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -5537,7 +5537,7 @@ public void testPushShadowingGeneratingPlanPastProject() { List initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes(); LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan); - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); @@ -5592,7 +5592,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() { List initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes(); LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan); - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); @@ -5652,7 +5652,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() { // This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has // been propagated into the generating plan. - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); From 4e160266626759745468006d755f96ff4957d7b1 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 12 Sep 2025 13:57:27 -0600 Subject: [PATCH 5/6] Fix the fix for https://github.com/elastic/elasticsearch/issues/118531 --- .../esql/action/CrossClusterEnrichIT.java | 27 +++++++++++++++++++ .../xpack/esql/optimizer/LogicalVerifier.java | 3 ++- .../esql/optimizer/PhysicalVerifier.java | 13 +++++++-- .../PostOptimizationPhasePlanVerifier.java | 2 +- 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index f848c663d3ca6..f5880b71c4615 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -402,6 +402,33 @@ public void testTopNThenEnrichRemote() { assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } + + // No renames, no KEEP + query = """ + FROM *:events,events + | eval ip= TO_STR(host) + | SORT timestamp, user, ip + | LIMIT 5 + | ENRICH _remote:hosts + """; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat( + getValuesList(resp), + equalTo( + List.of( + List.of("192.168.1.2", 1L, "andres", "192.168.1.2", "Windows", "192.168.1.2"), + List.of("192.168.1.3", 1L, "matthew", "192.168.1.3", "MacOS", "192.168.1.3"), + Arrays.asList("192.168.1.25", 1L, "park", (String) null, (String) null, "192.168.1.25"), + List.of("192.168.1.5", 2L, "akio", "192.168.1.5", "Android", "192.168.1.5"), + List.of("192.168.1.6", 2L, "sergio", "192.168.1.6", "iOS", "192.168.1.6") + ) + ) + ); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); + assertCCSExecutionInfoDetails(executionInfo); + } } public void testLimitThenEnrichRemote() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java index dd33538bc0141..ba930988fe5ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java @@ -34,8 +34,9 @@ public static LogicalVerifier getGeneralVerifier() { @Override boolean hasRemoteEnrich(LogicalPlan optimizedPlan) { + // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance); - return enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE; + return enriches.stream().anyMatch(e -> ((Enrich) e).mode() == Enrich.Mode.REMOTE); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java index f4116a3baa7d7..9aece6065ef33 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import static org.elasticsearch.xpack.esql.common.Failure.fail; @@ -38,8 +39,16 @@ public static PhysicalVerifier getGeneralVerifier() { @Override boolean hasRemoteEnrich(PhysicalPlan optimizedPlan) { // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 - var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); - return enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE; + if (isLocal) { + var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); + return enriches.stream().anyMatch(e -> ((EnrichExec) e).mode() == Enrich.Mode.REMOTE); + } else { + var fragments = optimizedPlan.collectFirstChildren(FragmentExec.class::isInstance); + return fragments.stream().map(FragmentExec.class::cast).anyMatch(f -> { + var enriches = f.fragment().collectFirstChildren(Enrich.class::isInstance); + return enriches.stream().anyMatch(e -> ((Enrich) e).mode() == Enrich.Mode.REMOTE); + }); + } } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java index 39c8c75d5f6b3..4ebfde2f2b9f3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java @@ -41,7 +41,7 @@ public Failures verify(P optimizedPlan, List expectedOutputAttributes Failures depFailures = new Failures(); // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 // This is a temporary workaround to skip verification when there is a remote enrich, due to a bug - if (isLocal && hasRemoteEnrich(optimizedPlan)) { + if (hasRemoteEnrich(optimizedPlan)) { return failures; } From ff5cabdee9583d150db1854e15b6617ef9c79ac8 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 2 Oct 2025 07:42:12 +0000 Subject: [PATCH 6/6] [CI] Update transport version definitions --- server/src/main/resources/transport/upper_bounds/8.18.csv | 2 +- server/src/main/resources/transport/upper_bounds/8.19.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.0.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.1.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.2.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.3.csv | 1 + 6 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 server/src/main/resources/transport/upper_bounds/9.3.csv diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000