Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/130855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 130855
summary: Add checks that optimizers do not modify the layout
area: ES|QL
type: enhancement
issues:
- 125576
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,19 @@ public String nodeString() {
}

protected abstract String label();

/**
* Compares the size and datatypes of two lists of attributes for equality.
*/
public static boolean datatypeEquals(List<Attribute> left, List<Attribute> right) {
if (left.size() != right.size()) {
return false;
}
for (int i = 0; i < left.size(); i++) {
if (left.get(i).dataType().equals(right.get(i).dataType()) == false) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private static Batch<LogicalPlan> localOperators() {

public LogicalPlan localOptimize(LogicalPlan plan) {
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true);
Failures failures = verifier.verify(optimized, true, plan);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan);
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, true);
PhysicalPlan verify(PhysicalPlan planAfter, PhysicalPlan planBefore) {
Failures failures = verifier.verify(planAfter, true, planBefore);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return planAfter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized, false);
Failures failures = verifier.verify(optimized, false, verified);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,29 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Attribute.datatypeEquals;

public final class LogicalVerifier {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();

private LogicalVerifier() {}

/** Verifies the optimized logical plan. */
public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
public Failures verify(LogicalPlan planAfter, boolean skipRemoteEnrichVerification, LogicalPlan planBefore) {
Failures failures = new Failures();
Failures dependencyFailures = new Failures();

if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
var enriches = planAfter.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}
}

plan.forEachUp(p -> {
planAfter.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, dependencyFailures);

if (failures.hasFailures() == false) {
Expand All @@ -47,6 +50,12 @@ public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
}
});

if (datatypeEquals(planBefore.output(), planAfter.output()) == false) {
failures.add(
fail(planAfter, "Layout has changed from [{}] to [{}]. ", planBefore.output().toString(), planAfter.output().toString())
);
}

if (dependencyFailures.hasFailures()) {
throw new IllegalStateException(dependencyFailures.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
}

public PhysicalPlan optimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan);
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, false);
PhysicalPlan verify(PhysicalPlan planAfter, PhysicalPlan planBefore) {
Failures failures = verifier.verify(planAfter, false, planBefore);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return planAfter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

import static org.elasticsearch.index.IndexMode.LOOKUP;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Attribute.datatypeEquals;

/** Physical plan verifier. */
public final class PhysicalVerifier {
Expand All @@ -27,19 +30,19 @@ public final class PhysicalVerifier {
private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
public Failures verify(PhysicalPlan planAfter, boolean skipRemoteEnrichVerification, PhysicalPlan planBefore) {
Failures failures = new Failures();
Failures depFailures = new Failures();

if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
var enriches = planAfter.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}
}

plan.forEachDown(p -> {
planAfter.forEachDown(p -> {
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand Down Expand Up @@ -67,6 +70,14 @@ public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification)
}
});

if (datatypeEquals(planBefore.output(), planAfter.output()) == false) {
if ((planAfter instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP) == false) {
failures.add(
fail(planAfter, "Layout has changed from [{}] to [{}]. ", planBefore.output().toString(), planAfter.output().toString())
);
}
}

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected final ExecutionInfo executeWithInfo(TreeType plan) {
if (tf.hasChanged()) {
hasChanged = true;
if (log.isTraceEnabled()) {
log.trace("Rule {} applied\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
log.trace("Rule {} applied with change\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
}
} else {
if (log.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5560,7 +5560,7 @@ public void testPushShadowingGeneratingPlanPastProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5611,7 +5611,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5667,7 +5667,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() {

// This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
// been propagated into the generating plan.
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2866,7 +2866,7 @@ public void testFieldExtractWithoutSourceAttributes() {
)
);

var e = expectThrows(VerificationException.class, () -> physicalPlanOptimizer.verify(badPlan));
var e = expectThrows(VerificationException.class, () -> physicalPlanOptimizer.verify(badPlan, verifiedPlan));
assertThat(
e.getMessage(),
containsString(
Expand All @@ -2881,7 +2881,7 @@ public void testVerifierOnMissingReferences() {
| stats s = sum(salary) by emp_no
| where emp_no > 10
""");

final var planBeforeModification = plan;
plan = plan.transformUp(
AggregateExec.class,
a -> new AggregateExec(
Expand All @@ -2895,7 +2895,7 @@ public void testVerifierOnMissingReferences() {
)
);
final var finalPlan = plan;
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan));
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan, planBeforeModification));
assertThat(e.getMessage(), containsString(" > 10[INTEGER]]] optimized incorrectly due to missing references [emp_no{f}#"));
}

Expand All @@ -2913,7 +2913,7 @@ public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception {

var planWithInvalidJoinLeftSide = plan.transformUp(LookupJoinExec.class, join -> join.replaceChildren(join.right(), join.right()));

var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinLeftSide));
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinLeftSide, plan));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from left hand side [languages"));

var planWithInvalidJoinRightSide = plan.transformUp(
Expand All @@ -2930,7 +2930,7 @@ public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception {
)
);

e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinRightSide));
e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinRightSide, plan));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from right hand side [language_code"));
}

Expand All @@ -2940,7 +2940,7 @@ public void testVerifierOnDuplicateOutputAttributes() {
| stats s = sum(salary) by emp_no
| where emp_no > 10
""");

final var planBeforeModification = plan;
plan = plan.transformUp(AggregateExec.class, a -> {
List<Attribute> intermediates = new ArrayList<>(a.intermediateAttributes());
intermediates.add(intermediates.get(0));
Expand All @@ -2955,7 +2955,7 @@ public void testVerifierOnDuplicateOutputAttributes() {
);
});
final var finalPlan = plan;
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan));
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan, planBeforeModification));
assertThat(
e.getMessage(),
containsString("Plan [LimitExec[1000[INTEGER],null]] optimized incorrectly due to duplicate output attribute emp_no{f}#")
Expand Down
Loading