Skip to content
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/8.18.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_8_18_6,8840008
transform_check_for_dangling_tasks,8840011
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/8.19.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_8_19_3,8841067
transform_check_for_dangling_tasks,8841070
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.0.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_9_0_6,9000015
transform_check_for_dangling_tasks,9000018
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.1.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_9_1_4,9112007
transform_check_for_dangling_tasks,9112009
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ml_inference_endpoint_cache,9157000
initial_9.2.0,9185000
1 change: 1 addition & 0 deletions server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial_9.2.0,9185000
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,33 @@ public void testTopNThenEnrichRemote() {
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}

// No renames, no KEEP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to make a new test out of this, possibly adding a reference to why we add this test (optional). And/or detail why KEEP is "detrimental" to the test. 🙏

query = """
FROM *:events,events
| eval ip= TO_STR(host)
| SORT timestamp, user, ip
| LIMIT 5
| ENRICH _remote:hosts
""";
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
assertThat(
getValuesList(resp),
equalTo(
List.of(
List.of("192.168.1.2", 1L, "andres", "192.168.1.2", "Windows", "192.168.1.2"),
List.of("192.168.1.3", 1L, "matthew", "192.168.1.3", "MacOS", "192.168.1.3"),
Arrays.asList("192.168.1.25", 1L, "park", (String) null, (String) null, "192.168.1.25"),
List.of("192.168.1.5", 2L, "akio", "192.168.1.5", "Android", "192.168.1.5"),
List.of("192.168.1.6", 2L, "sergio", "192.168.1.6", "iOS", "192.168.1.6")
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}

public void testLimitThenEnrichRemote() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;
private final LogicalVerifier verifier = LogicalVerifier.getLocalVerifier();

private static final List<Batch<LogicalPlan>> RULES = arrayAsArrayList(
new Batch<>(
Expand Down Expand Up @@ -90,7 +90,7 @@ private static Batch<LogicalPlan> localOperators() {

public LogicalPlan localOptimize(LogicalPlan plan) {
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true, plan.output());
Failures failures = verifier.verify(optimized, plan.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic

private static final List<Batch<PhysicalPlan>> RULES = rules(true);

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
private final PhysicalVerifier verifier = PhysicalVerifier.getLocalVerifier();

public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
super(context);
Expand All @@ -47,7 +47,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized())
);

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;
private final LogicalVerifier verifier = LogicalVerifier.getGeneralVerifier();

public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
super(optimizerContext);
Expand All @@ -114,7 +114,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized, false, verified.output());
Failures failures = verifier.verify(optimized, verified.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@

public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working on micro optimizations I remember seeing some cost associated with initializing list of rules in various optimizers. Here LogicalVerifier looks much simpler, but I wonder if we could reuse a singe static object for local verifier and one for general? This would avoid creating new instances for each query execution. I acknowledge here it is much cheaper than actual rules initialization else where.

private LogicalVerifier(boolean isLocal) {
super(isLocal);
}

private LogicalVerifier() {}
public static LogicalVerifier getLocalVerifier() {
return new LogicalVerifier(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of re-instantiating this all the time, we could hand out a singleton. Not sure it makes a difference, though.

We could also make this methods on PostOptimizationPhasePlanVerifier as they work the same for the logical and physical verifiers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of re-instantiating this all the time, we could hand out a singleton.

+1. I guess we'll want this for both flavours (logical, physical).

Not sure it makes a difference, though.

Maybe not performance-wise, but still has a "better feel". Besides, a node can play both roles, so both types (local/coordinator) would eventually be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why the variable which it gets assigned to is not static anyway? Probably should make it so?

}

public static LogicalVerifier getGeneralVerifier() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so far we only have a "local" thing (verifier, mapper, optimizer etc.) and the unqualified other variant, which is the coordinator one. Let's avoid introducing a notion of a "general" thing (which wouldn't be "general", as it wouldn't apply locally :) ).
We should use "coordinator" wherever we need to denote a non-local execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe "global" because it applies to the whole plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe "global" because it applies to the whole plan?

My thinking is that there isn't a "global plan", executed "globally, everywhere": there's a coordinator plan and one or more (as the pipelines break) node plans, no?

return new LogicalVerifier(false);
}

@Override
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return true;
}
}
return false;
boolean hasRemoteEnrich(LogicalPlan optimizedPlan) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
return enriches.stream().anyMatch(e -> ((Enrich) e).mode() == Enrich.Mode.REMOTE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns())
);

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
private final PhysicalVerifier verifier = PhysicalVerifier.getGeneralVerifier();

public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
super(context);
Expand All @@ -39,7 +39,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,42 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

import static org.elasticsearch.xpack.esql.common.Failure.fail;

/** Physical plan verifier. */
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {

public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
private PhysicalVerifier(boolean isLocal) {
super(isLocal);
}

public static PhysicalVerifier getLocalVerifier() {
return new PhysicalVerifier(true);
}

private PhysicalVerifier() {}
public static PhysicalVerifier getGeneralVerifier() {
return new PhysicalVerifier(false);
}

@Override
boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
boolean hasRemoteEnrich(PhysicalPlan optimizedPlan) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
if (isLocal) {
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return true;
}
return enriches.stream().anyMatch(e -> ((EnrichExec) e).mode() == Enrich.Mode.REMOTE);
} else {
var fragments = optimizedPlan.collectFirstChildren(FragmentExec.class::isInstance);
return fragments.stream().map(FragmentExec.class::cast).anyMatch(f -> {
var enriches = f.fragment().collectFirstChildren(Enrich.class::isInstance);
return enriches.stream().anyMatch(e -> ((Enrich) e).mode() == Enrich.Mode.REMOTE);
});
}
return false;
}

@Override
Expand All @@ -54,6 +67,12 @@ void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failure
);
}
}

// This check applies only for general physical plans (isLocal == false)
if (isLocal == false && p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add a check for the converse - after physical optimization on the remote, there shouldn't be any coordinator-only nodes in the plan.

failures.add(fail(p, "Physical plan contains remote executing operation [{}] in local part", p.nodeName()));
}

PlanConsistencyChecker.checkPlan(p, depFailures);

if (failures.hasFailures() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
*/
public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {

// Are we verifying the global plan (coordinator) or a local plan (data node)?
protected final boolean isLocal;

protected PostOptimizationPhasePlanVerifier(boolean isLocal) {
this.isLocal = isLocal;
}

/** Verifies the optimized plan */
public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
public Failures verify(P optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = new Failures();
Failures depFailures = new Failures();
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
// This is a temporary workaround to skip verification when there is a remote enrich, due to a bug
if (hasRemoteEnrich(optimizedPlan)) {
return failures;
}

Expand All @@ -47,7 +56,8 @@ public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, Li
return failures;
}

abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);
// This is a temporary workaround to skip verification when there is a remote enrich, due to a bug
abstract boolean hasRemoteEnrich(P optimizedPlan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: hasRemoteEnrich() is a bit of a misleading name, since it's not actually a property of the verifier, but of the thing the verifier verifies. And this method's sole role is to stop/skip the verification. skipVerification() could be repurposed for other cases, but even if we'll remove it after we fix the bug[*], I'd find something like "skipOnRemoteEnrich()" (or some other variation) a bit more suggestive.

[*] "due to a bug" should contain a pointer to announced bug. We could add just a (#118531) if the URL is too long.


abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ public class Enrich extends UnaryPlan

@Override
public ExecuteLocation executesOn() {
if (mode == Mode.REMOTE) {
return ExecuteLocation.REMOTE;
} else if (mode == Mode.COORDINATOR) {
return ExecuteLocation.COORDINATOR;
}
return ExecuteLocation.ANY;
return switch (mode) {
case REMOTE -> ExecuteLocation.REMOTE;
case COORDINATOR -> ExecuteLocation.COORDINATOR;
default -> ExecuteLocation.ANY;
};
}

public enum Mode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;

import java.io.IOException;
import java.util.List;
Expand All @@ -30,7 +31,7 @@

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class EnrichExec extends UnaryExec implements EstimatesRowSize {
public class EnrichExec extends UnaryExec implements EstimatesRowSize, ExecutesOn {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
"EnrichExec",
Expand Down Expand Up @@ -220,4 +221,13 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(super.hashCode(), mode, matchType, matchField, policyName, policyMatchField, concreteIndices, enrichFields);
}

@Override
public ExecuteLocation executesOn() {
return switch (mode) {
case REMOTE -> ExecuteLocation.REMOTE;
case COORDINATOR -> ExecuteLocation.COORDINATOR;
default -> ExecuteLocation.ANY;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5537,7 +5537,7 @@ public void testPushShadowingGeneratingPlanPastProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output());
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5592,7 +5592,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output());
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5652,7 +5652,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() {

// This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
// been propagated into the generating plan.
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output());
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down