diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index f848c663d3ca6..f5880b71c4615 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -402,6 +402,33 @@ public void testTopNThenEnrichRemote() { assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } + + // No renames, no KEEP + query = """ + FROM *:events,events + | eval ip= TO_STR(host) + | SORT timestamp, user, ip + | LIMIT 5 + | ENRICH _remote:hosts + """; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat( + getValuesList(resp), + equalTo( + List.of( + List.of("192.168.1.2", 1L, "andres", "192.168.1.2", "Windows", "192.168.1.2"), + List.of("192.168.1.3", 1L, "matthew", "192.168.1.3", "MacOS", "192.168.1.3"), + Arrays.asList("192.168.1.25", 1L, "park", (String) null, (String) null, "192.168.1.25"), + List.of("192.168.1.5", 2L, "akio", "192.168.1.5", "Android", "192.168.1.5"), + List.of("192.168.1.6", 2L, "sergio", "192.168.1.6", "iOS", "192.168.1.6") + ) + ) + ); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); + assertCCSExecutionInfoDetails(executionInfo); + } } public void testLimitThenEnrichRemote() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index 088b1ad15557f..11aeb610f2ef8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -39,7 +39,7 @@ */ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor { - private final LogicalVerifier verifier = LogicalVerifier.INSTANCE; + private final LogicalVerifier verifier = LogicalVerifier.getLocalVerifier(); private static final List> RULES = arrayAsArrayList( new Batch<>( @@ -90,7 +90,7 @@ private static Batch localOperators() { public LogicalPlan localOptimize(LogicalPlan plan) { LogicalPlan optimized = execute(plan); - Failures failures = verifier.verify(optimized, true, plan.output()); + Failures failures = verifier.verify(optimized, plan.output()); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index deb79221b3ea0..3d7d4a9ac91c7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -36,7 +36,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor> RULES = rules(true); - private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; + private final PhysicalVerifier verifier = PhysicalVerifier.getLocalVerifier(); public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) { super(context); @@ -47,7 +47,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) { } PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { - Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes); + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index 6f550524c5ca5..a6c4fa173391a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -105,7 +105,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor("Set as Optimized", Limiter.ONCE, new SetAsOptimized()) ); - private final LogicalVerifier verifier = LogicalVerifier.INSTANCE; + private final LogicalVerifier verifier = LogicalVerifier.getGeneralVerifier(); public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) { super(optimizerContext); @@ -114,7 +114,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) { public LogicalPlan optimize(LogicalPlan verified) { var optimized = execute(verified); - Failures failures = verifier.verify(optimized, false, verified.output()); + Failures failures = verifier.verify(optimized, verified.output()); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java index 4aa70be5713e8..ba930988fe5ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java @@ -20,20 +20,23 @@ public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier { - public static final LogicalVerifier INSTANCE = new LogicalVerifier(); + private LogicalVerifier(boolean isLocal) { + super(isLocal); + } - private LogicalVerifier() {} + public static LogicalVerifier getLocalVerifier() { + return new LogicalVerifier(true); + } + + public static LogicalVerifier getGeneralVerifier() { + return new LogicalVerifier(false); + } @Override - boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) { - if (skipRemoteEnrichVerification) { - // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 - var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance); - if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { - return true; - } - } - return false; + boolean hasRemoteEnrich(LogicalPlan optimizedPlan) { + // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 + var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance); + return enriches.stream().anyMatch(e -> ((Enrich) e).mode() == Enrich.Mode.REMOTE); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java index 6d60c547f47d6..a8101165e7a5e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java @@ -28,7 +28,7 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()) ); - private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; + private final PhysicalVerifier verifier = PhysicalVerifier.getGeneralVerifier(); public PhysicalPlanOptimizer(PhysicalOptimizerContext context) { super(context); @@ -39,7 +39,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) { } PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { - Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes); + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); if (failures.hasFailures()) { throw new VerificationException(failures); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java index 781a8f5263c1f..9aece6065ef33 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java @@ -13,8 +13,10 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import static org.elasticsearch.xpack.esql.common.Failure.fail; @@ -22,20 +24,31 @@ /** Physical plan verifier. */ public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier { - public static final PhysicalVerifier INSTANCE = new PhysicalVerifier(); + private PhysicalVerifier(boolean isLocal) { + super(isLocal); + } + + public static PhysicalVerifier getLocalVerifier() { + return new PhysicalVerifier(true); + } - private PhysicalVerifier() {} + public static PhysicalVerifier getGeneralVerifier() { + return new PhysicalVerifier(false); + } @Override - boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) { - if (skipRemoteEnrichVerification) { - // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 + boolean hasRemoteEnrich(PhysicalPlan optimizedPlan) { + // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 + if (isLocal) { var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); - if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { - return true; - } + return enriches.stream().anyMatch(e -> ((EnrichExec) e).mode() == Enrich.Mode.REMOTE); + } else { + var fragments = optimizedPlan.collectFirstChildren(FragmentExec.class::isInstance); + return fragments.stream().map(FragmentExec.class::cast).anyMatch(f -> { + var enriches = f.fragment().collectFirstChildren(Enrich.class::isInstance); + return enriches.stream().anyMatch(e -> ((Enrich) e).mode() == Enrich.Mode.REMOTE); + }); } - return false; } @Override @@ -54,6 +67,12 @@ void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failure ); } } + + // This check applies only for general physical plans (isLocal == false) + if (isLocal == false && p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) { + failures.add(fail(p, "Physical plan contains remote executing operation [{}] in local part", p.nodeName())); + } + PlanConsistencyChecker.checkPlan(p, depFailures); if (failures.hasFailures() == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java index e187264b2432f..4ebfde2f2b9f3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java @@ -28,11 +28,20 @@ */ public abstract class PostOptimizationPhasePlanVerifier

> { + // Are we verifying the global plan (coordinator) or a local plan (data node)? + protected final boolean isLocal; + + protected PostOptimizationPhasePlanVerifier(boolean isLocal) { + this.isLocal = isLocal; + } + /** Verifies the optimized plan */ - public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List expectedOutputAttributes) { + public Failures verify(P optimizedPlan, List expectedOutputAttributes) { Failures failures = new Failures(); Failures depFailures = new Failures(); - if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) { + // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531 + // This is a temporary workaround to skip verification when there is a remote enrich, due to a bug + if (hasRemoteEnrich(optimizedPlan)) { return failures; } @@ -47,7 +56,8 @@ public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, Li return failures; } - abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification); + // This is a temporary workaround to skip verification when there is a remote enrich, due to a bug + abstract boolean hasRemoteEnrich(P optimizedPlan); abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index ef268ff6b7964..6015cb9f42bf6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -74,12 +74,11 @@ public class Enrich extends UnaryPlan @Override public ExecuteLocation executesOn() { - if (mode == Mode.REMOTE) { - return ExecuteLocation.REMOTE; - } else if (mode == Mode.COORDINATOR) { - return ExecuteLocation.COORDINATOR; - } - return ExecuteLocation.ANY; + return switch (mode) { + case REMOTE -> ExecuteLocation.REMOTE; + case COORDINATOR -> ExecuteLocation.COORDINATOR; + default -> ExecuteLocation.ANY; + }; } public enum Mode { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java index 22ee88f8119ec..28d698a6fec64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import java.io.IOException; import java.util.List; @@ -30,7 +31,7 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class EnrichExec extends UnaryExec implements EstimatesRowSize { +public class EnrichExec extends UnaryExec implements EstimatesRowSize, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, "EnrichExec", @@ -220,4 +221,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(super.hashCode(), mode, matchType, matchField, policyName, policyMatchField, concreteIndices, enrichFields); } + + @Override + public ExecuteLocation executesOn() { + return switch (mode) { + case REMOTE -> ExecuteLocation.REMOTE; + case COORDINATOR -> ExecuteLocation.COORDINATOR; + default -> ExecuteLocation.ANY; + }; + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 1a181fe805e81..02a5da899f04a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -5537,7 +5537,7 @@ public void testPushShadowingGeneratingPlanPastProject() { List initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes(); LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan); - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); @@ -5592,7 +5592,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() { List initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes(); LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan); - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class); @@ -5652,7 +5652,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() { // This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has // been propagated into the generating plan. - Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output()); + Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output()); assertFalse(inconsistencies.hasFailures()); Project project = as(optimizedPlan, Project.class);