Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;
private final LogicalVerifier verifier = LogicalVerifier.getLocalVerifier();

private static final List<Batch<LogicalPlan>> RULES = arrayAsArrayList(
new Batch<>(
Expand Down Expand Up @@ -90,7 +90,7 @@ private static Batch<LogicalPlan> localOperators() {

public LogicalPlan localOptimize(LogicalPlan plan) {
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true, plan.output());
Failures failures = verifier.verify(optimized, plan.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic

private static final List<Batch<PhysicalPlan>> RULES = rules(true);

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
private final PhysicalVerifier verifier = PhysicalVerifier.getLocalVerifier();

public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
super(context);
Expand All @@ -48,7 +48,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized())
);

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;
private final LogicalVerifier verifier = LogicalVerifier.getGeneralVerifier();

public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
super(optimizerContext);
Expand All @@ -114,7 +114,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized, false, verified.output());
Failures failures = verifier.verify(optimized, verified.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@

public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working on micro optimizations I remember seeing some cost associated with initializing list of rules in various optimizers. Here LogicalVerifier looks much simpler, but I wonder if we could reuse a singe static object for local verifier and one for general? This would avoid creating new instances for each query execution. I acknowledge here it is much cheaper than actual rules initialization else where.

private LogicalVerifier(boolean isLocal) {
super(isLocal);
}

private LogicalVerifier() {}
public static LogicalVerifier getLocalVerifier() {
return new LogicalVerifier(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of re-instantiating this all the time, we could hand out a singleton. Not sure it makes a difference, though.

We could also make this methods on PostOptimizationPhasePlanVerifier as they work the same for the logical and physical verifiers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of re-instantiating this all the time, we could hand out a singleton.

+1. I guess we'll want this for both flavours (logical, physical).

Not sure it makes a difference, though.

Maybe not performance-wise, but still has a "better feel". Besides, a node can play both roles, so both types (local/coordinator) would eventually be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why the variable which it gets assigned to is not static anyway? Probably should make it so?

}

public static LogicalVerifier getGeneralVerifier() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so far we only have a "local" thing (verifier, mapper, optimizer etc.) and the unqualified other variant, which is the coordinator one. Let's avoid introducing a notion of a "general" thing (which wouldn't be "general", as it wouldn't apply locally :) ).
We should use "coordinator" wherever we need to denote a non-local execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe "global" because it applies to the whole plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe "global" because it applies to the whole plan?

My thinking is that there isn't a "global plan", executed "globally, everywhere": there's a coordinator plan and one or more (as the pipelines break) node plans, no?

return new LogicalVerifier(false);
}

@Override
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return true;
}
}
return false;
boolean hasRemoteEnrich(LogicalPlan optimizedPlan) {
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
return enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help me understand why we only checking the first enrich?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh I am not sure. But I haven't changed that part, it's exactly the same as if were before. @bpintea may know more about this?

Copy link
Contributor

@bpintea bpintea Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.elastic.co/docs/reference/query-languages/esql/esql-cross-clusters#esql-multi-enrich states: "A _remote enrich command can’t be executed after a _coordinator enrich command."
This is checked Enrich#checkForPlansForbiddenBeforeRemoteEnrich

@smalyshev, it might be worth adding a (now missing) comment about why the [0] is chosen to be tried as a REMOTE enrich. 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be ANY though, not? ANY does not conflict with other types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing, but I think we only skip if the topmost ENRICH in the leftmost branch is remote - this is the only situation when the remote enrich hack can lead to shadowing issues that I can think of, because then the remote enrich is the top enrich node in the local plan.

We could skip more widely by looking for any remote enriches; but I think we want to fix the remote enrich hack soon, anyway, so maybe it's better to invest the effort in the real fix.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns())
);

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
private final PhysicalVerifier verifier = PhysicalVerifier.getGeneralVerifier();

public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
super(context);
Expand All @@ -39,7 +39,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,23 @@
/** Physical plan verifier. */
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {

public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
private PhysicalVerifier(boolean isLocal) {
super(isLocal);
}

private PhysicalVerifier() {}
public static PhysicalVerifier getLocalVerifier() {
return new PhysicalVerifier(true);
}

public static PhysicalVerifier getGeneralVerifier() {
return new PhysicalVerifier(false);
}

@Override
boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return true;
}
}
return false;
boolean hasRemoteEnrich(PhysicalPlan optimizedPlan) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
return enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE;
}

@Override
Expand All @@ -56,7 +59,8 @@ void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failure
}
}

if (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) {
// This check applies only for general physical plans (isLocal == false)
if (isLocal == false && p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add a check for the converse - after physical optimization on the remote, there shouldn't be any coordinator-only nodes in the plan.

failures.add(fail(p, "Physical plan contains remote executing operation [{}] in local part", p.nodeName()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
*/
public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {

// Are we verifying the global plan (coordinator) or a local plan (data node)?
protected final boolean isLocal;

protected PostOptimizationPhasePlanVerifier(boolean isLocal) {
this.isLocal = isLocal;
}

/** Verifies the optimized plan */
public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
public Failures verify(P optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = new Failures();
Failures depFailures = new Failures();
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
// This is a temporary workaround to skip verification when there is a remote enrich, due to a bug
if (isLocal && hasRemoteEnrich(optimizedPlan)) {
return failures;
}

Expand All @@ -47,7 +56,8 @@ public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, Li
return failures;
}

abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);
// This is a temporary workaround to skip verification when there is a remote enrich, due to a bug
abstract boolean hasRemoteEnrich(P optimizedPlan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: hasRemoteEnrich() is a bit of a misleading name, since it's not actually a property of the verifier, but of the thing the verifier verifies. And this method's sole role is to stop/skip the verification. skipVerification() could be repurposed for other cases, but even if we'll remove it after we fix the bug[*], I'd find something like "skipOnRemoteEnrich()" (or some other variation) a bit more suggestive.

[*] "due to a bug" should contain a pointer to announced bug. We could add just a (#118531) if the URL is too long.


abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ public class Enrich extends UnaryPlan

@Override
public ExecuteLocation executesOn() {
if (mode == Mode.REMOTE) {
return ExecuteLocation.REMOTE;
} else if (mode == Mode.COORDINATOR) {
return ExecuteLocation.COORDINATOR;
}
return ExecuteLocation.ANY;
return switch (mode) {
case REMOTE -> ExecuteLocation.REMOTE;
case COORDINATOR -> ExecuteLocation.COORDINATOR;
default -> ExecuteLocation.ANY;
};
}

public enum Mode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,10 @@ public int hashCode() {

@Override
public ExecuteLocation executesOn() {
if (mode == Enrich.Mode.REMOTE) {
return ExecuteLocation.REMOTE;
} else if (mode == Enrich.Mode.COORDINATOR) {
return ExecuteLocation.COORDINATOR;
}
return ExecuteLocation.ANY;
return switch (mode) {
case REMOTE -> ExecuteLocation.REMOTE;
case COORDINATOR -> ExecuteLocation.COORDINATOR;
default -> ExecuteLocation.ANY;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5537,7 +5537,7 @@ public void testPushShadowingGeneratingPlanPastProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output());
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5592,7 +5592,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output());
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5652,7 +5652,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() {

// This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
// been propagated into the generating plan.
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
Failures inconsistencies = LogicalVerifier.getGeneralVerifier().verify(optimizedPlan, initialPlan.output());
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down