Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/130855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 130855
summary: Add checks that optimizers do not modify the layout
area: ES|QL
type: enhancement
issues:
- 125576
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,19 @@ public String nodeString() {
}

protected abstract String label();

/**
* Compares the size and datatypes of two lists of attributes for equality.
*/
public static boolean datatypeEquals(List<Attribute> left, List<Attribute> right) {
if (left.size() != right.size()) {
return false;
}
for (int i = 0; i < left.size(); i++) {
if (left.get(i).dataType() != right.get(i).dataType()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private static Batch<LogicalPlan> localOperators() {

public LogicalPlan localOptimize(LogicalPlan plan) {
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true);
Failures failures = verifier.verify(optimized, true, plan.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ParallelizeTimeSeriesSource;
Expand Down Expand Up @@ -42,15 +43,15 @@ public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan.output());
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, true);
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return optimizedPlan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized, false);
Failures failures = verifier.verify(optimized, false, verified.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,31 @@
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

public final class LogicalVerifier {
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();

private LogicalVerifier() {}

/** Verifies the optimized logical plan. */
public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures dependencyFailures = new Failures();

@Override
boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
return true;
}
}
return false;
}

plan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, dependencyFailures);
@Override
void checkPlanConsistency(QueryPlan<?> optimizedPlan, Failures failures, Failures depFailures) {
optimizedPlan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, depFailures);

if (failures.hasFailures() == false) {
if (p instanceof PostOptimizationVerificationAware pova) {
Expand All @@ -46,11 +47,5 @@ public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
});
}
});

if (dependencyFailures.hasFailures()) {
throw new IllegalStateException(dependencyFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand All @@ -34,15 +35,15 @@ public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
}

public PhysicalPlan optimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan.output());
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, false);
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return optimizedPlan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,35 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

import static org.elasticsearch.xpack.esql.common.Failure.fail;

/** Physical plan verifier. */
public final class PhysicalVerifier {
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can the superclass receive a generic with the type of plan?
It's not very important now, but lines like optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance); should fail if you write Enrich.class by mistake (For example). But as it's a QueryPlan, it won't now


public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();

private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures depFailures = new Failures();

@Override
boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
return true;
}
}
return false;
}

plan.forEachDown(p -> {
@Override
void checkPlanConsistency(QueryPlan<?> optimizedPlan, Failures failures, Failures depFailures) {
optimizedPlan.forEachDown(p -> {
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand All @@ -66,11 +67,5 @@ public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification)
});
}
});

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;

import java.util.List;

import static org.elasticsearch.index.IndexMode.LOOKUP;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Attribute.datatypeEquals;

/**
* Verifies the plan after optimization.
* This is invoked immediately after a Plan Optimizer completes its work.
* Currently, it is called after LogicalPlanOptimizer, PhysicalPlanOptimizer,
* LocalLogicalPlanOptimizer, and LocalPhysicalPlanOptimizer.
* Note: Logical and Physical optimizers may override methods in this class to perform different checks.
*/
public abstract class PostOptimizationPhasePlanVerifier {

/** Verifies the optimized plan */
public Failures verify(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
Failures failures = new Failures();
Failures depFailures = new Failures();
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
return failures;
}

checkPlanConsistency(optimizedPlan, failures, depFailures);

verifyOutputNotChanged(optimizedPlan, expectedOutputAttributes, failures);

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}

abstract boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification);

abstract void checkPlanConsistency(QueryPlan<?> optimizedPlan, Failures failures, Failures depFailures);

private static void verifyOutputNotChanged(QueryPlan<?> optimizedPlan, List<Attribute> expectedOutputAttributes, Failures failures) {
if (datatypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
// LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by
// ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the
// right hand side will only have the EsQueryExec providing the _doc attribute and nothing else.
if ((optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP) == false) {
failures.add(
fail(
optimizedPlan,
"Output has changed from [{}] to [{}]. ",
expectedOutputAttributes.toString(),
optimizedPlan.output().toString()
)
);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected final ExecutionInfo executeWithInfo(TreeType plan) {
if (tf.hasChanged()) {
hasChanged = true;
if (log.isTraceEnabled()) {
log.trace("Rule {} applied\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
log.trace("Rule {} applied with change\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
}
} else {
if (log.isTraceEnabled()) {
Expand Down
Loading
Loading