Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/130855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 130855
summary: Add checks that optimizers do not modify the layout
area: ES|QL
type: enhancement
issues:
- 125576
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,19 @@ public String nodeString() {
}

protected abstract String label();

/**
* Compares the size and datatypes of two lists of attributes for equality.
*/
public static boolean dataTypeEquals(List<Attribute> left, List<Attribute> right) {
if (left.size() != right.size()) {
return false;
}
for (int i = 0; i < left.size(); i++) {
if (left.get(i).dataType() != right.get(i).dataType()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static Batch<LogicalPlan> localOperators() {

public LogicalPlan localOptimize(LogicalPlan plan) {
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true);
Failures failures = verifier.verify(optimized, true, plan.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ParallelizeTimeSeriesSource;
Expand Down Expand Up @@ -42,15 +43,15 @@ public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan.output());
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, true);
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return optimizedPlan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized, false);
Failures failures = verifier.verify(optimized, false, verified.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,28 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

public final class LogicalVerifier {
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();

private LogicalVerifier() {}

/** Verifies the optimized logical plan. */
public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures dependencyFailures = new Failures();

@Override
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
return true;
}
}
return false;
}

plan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, dependencyFailures);
@Override
void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures depFailures) {
optimizedPlan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, depFailures);

if (failures.hasFailures() == false) {
if (p instanceof PostOptimizationVerificationAware pova) {
Expand All @@ -46,11 +47,5 @@ public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
});
}
});

if (dependencyFailures.hasFailures()) {
throw new IllegalStateException(dependencyFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand All @@ -34,15 +35,15 @@ public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
}

public PhysicalPlan optimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan.output());
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, false);
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return optimizedPlan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@
import static org.elasticsearch.xpack.esql.common.Failure.fail;

/** Physical plan verifier. */
public final class PhysicalVerifier {
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {

public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();

private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures depFailures = new Failures();

@Override
boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
return true;
}
}
return false;
}

plan.forEachDown(p -> {
@Override
void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failures depFailures) {
optimizedPlan.forEachDown(p -> {
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand All @@ -66,11 +67,5 @@ public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification)
});
}
});

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;

import java.util.List;

import static org.elasticsearch.index.IndexMode.LOOKUP;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Attribute.dataTypeEquals;

/**
* Verifies the plan after optimization.
* This is invoked immediately after a Plan Optimizer completes its work.
* Currently, it is called after LogicalPlanOptimizer, PhysicalPlanOptimizer,
* LocalLogicalPlanOptimizer, and LocalPhysicalPlanOptimizer.
* Note: Logical and Physical optimizers may override methods in this class to perform different checks.
*/
public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {

/** Verifies the optimized plan */
public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
Failures failures = new Failures();
Failures depFailures = new Failures();
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
return failures;
}

checkPlanConsistency(optimizedPlan, failures, depFailures);

verifyOutputNotChanged(optimizedPlan, expectedOutputAttributes, failures);

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}

abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);

abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);

private static void verifyOutputNotChanged(QueryPlan<?> optimizedPlan, List<Attribute> expectedOutputAttributes, Failures failures) {
if (dataTypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
// If the output level is empty we add a column called ProjectAwayColumns.ALL_FIELDS_PROJECTED
// We will ignore such cases for output verification
// TODO: this special casing is required due to https://github.com/elastic/elasticsearch/issues/121741, remove when fixed.
boolean hasProjectAwayColumns = optimizedPlan.output()
.stream()
.anyMatch(x -> x.name().equals(ProjectAwayColumns.ALL_FIELDS_PROJECTED));
// LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by
// ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the
// right hand side will only have the EsQueryExec providing the _doc attribute and nothing else.
// We perform an optimizer run on every fragment. LookupJoinExec also contains such a fragment,
// and currently it only contains an EsQueryExec after optimization.
boolean hasLookupJoinExec = optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP;
boolean ignoreError = hasProjectAwayColumns || hasLookupJoinExec;
if (ignoreError == false) {
failures.add(
fail(
optimizedPlan,
"Output has changed from [{}] to [{}]. ",
expectedOutputAttributes.toString(),
optimizedPlan.output().toString()
)
);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* extraction.
*/
public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
public static String ALL_FIELDS_PROJECTED = "<all-fields-projected>";

@Override
public PhysicalPlan apply(PhysicalPlan plan) {
Expand Down Expand Up @@ -94,7 +95,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
// add a synthetic field (so it doesn't clash with the user defined one) to return a constant
// to avoid the block from being trimmed
if (output.isEmpty()) {
var alias = new Alias(logicalFragment.source(), "<all-fields-projected>", Literal.NULL, null, true);
var alias = new Alias(logicalFragment.source(), ALL_FIELDS_PROJECTED, Literal.NULL, null, true);
List<Alias> fields = singletonList(alias);
logicalFragment = new Eval(logicalFragment.source(), logicalFragment, fields);
output = Expressions.asAttributes(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected final ExecutionInfo executeWithInfo(TreeType plan) {
if (tf.hasChanged()) {
hasChanged = true;
if (log.isTraceEnabled()) {
log.trace("Rule {} applied\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
log.trace("Rule {} applied with change\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
}
} else {
if (log.isTraceEnabled()) {
Expand Down
Loading
Loading