Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/130855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 130855
summary: Add checks that optimizers do not modify the layout
area: ES|QL
type: enhancement
issues:
- 125576
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,19 @@ public String nodeString() {
}

protected abstract String label();

/**
* Compares the size and datatypes of two lists of attributes for equality.
*/
public static boolean dataTypeEquals(List<Attribute> left, List<Attribute> right) {
if (left.size() != right.size()) {
return false;
}
for (int i = 0; i < left.size(); i++) {
if (left.get(i).dataType() != right.get(i).dataType()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private static Batch<LogicalPlan> localOperators() {

public LogicalPlan localOptimize(LogicalPlan plan) {
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true);
Failures failures = verifier.verify(optimized, true, plan.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ParallelizeTimeSeriesSource;
Expand Down Expand Up @@ -42,15 +43,15 @@ public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan.output());
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, true);
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return optimizedPlan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized, false);
Failures failures = verifier.verify(optimized, false, verified.output());
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,28 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

public final class LogicalVerifier {
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();

private LogicalVerifier() {}

/** Verifies the optimized logical plan. */
public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures dependencyFailures = new Failures();

@Override
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
return true;
}
}
return false;
}

plan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, dependencyFailures);
@Override
void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures depFailures) {
optimizedPlan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, depFailures);

if (failures.hasFailures() == false) {
if (p instanceof PostOptimizationVerificationAware pova) {
Expand All @@ -46,11 +47,5 @@ public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
});
}
});

if (dependencyFailures.hasFailures()) {
throw new IllegalStateException(dependencyFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand All @@ -34,15 +35,15 @@ public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
}

public PhysicalPlan optimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan.output());
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan, false);
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return optimizedPlan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@
import static org.elasticsearch.xpack.esql.common.Failure.fail;

/** Physical plan verifier. */
public final class PhysicalVerifier {
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {

public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();

private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures depFailures = new Failures();

@Override
boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
return true;
}
}
return false;
}

plan.forEachDown(p -> {
@Override
void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failures depFailures) {
optimizedPlan.forEachDown(p -> {
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand All @@ -66,11 +67,5 @@ public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification)
});
}
});

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;

import java.util.List;

import static org.elasticsearch.index.IndexMode.LOOKUP;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Attribute.dataTypeEquals;

/**
* Verifies the plan after optimization.
* This is invoked immediately after a Plan Optimizer completes its work.
* Currently, it is called after LogicalPlanOptimizer, PhysicalPlanOptimizer,
* LocalLogicalPlanOptimizer, and LocalPhysicalPlanOptimizer.
* Note: Logical and Physical optimizers may override methods in this class to perform different checks.
*/
public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {

/** Verifies the optimized plan */
public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
Failures failures = new Failures();
Failures depFailures = new Failures();
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
return failures;
}

checkPlanConsistency(optimizedPlan, failures, depFailures);

verifyOutputNotChanged(optimizedPlan, expectedOutputAttributes, failures);

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}

abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);

abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);

private static void verifyOutputNotChanged(QueryPlan<?> optimizedPlan, List<Attribute> expectedOutputAttributes, Failures failures) {
if (dataTypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
// If the output level is empty we add a column called ProjectAwayColumns.ALL_FIELDS_PROJECTED
// We will ignore such cases for output verification
boolean hasProjectAwayColumns = optimizedPlan.output()
.stream()
.anyMatch(x -> x.name().equals(ProjectAwayColumns.ALL_FIELDS_PROJECTED));
// LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by
// ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the
// right hand side will only have the EsQueryExec providing the _doc attribute and nothing else.
// We perform an optimizer run on every fragment. LookupJoinExec also contains such a fragment,
// and currently it only contains an EsQueryExec after optimization.
boolean hasLookupJoinExec = optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP;
boolean ignoreError = hasProjectAwayColumns || hasLookupJoinExec;
if (ignoreError == false) {
failures.add(
fail(
optimizedPlan,
"Output has changed from [{}] to [{}]. ",
expectedOutputAttributes.toString(),
optimizedPlan.output().toString()
)
);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.esql.rule.Rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import static java.lang.Boolean.FALSE;
Expand All @@ -36,6 +37,7 @@
* extraction.
*/
public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
public static String ALL_FIELDS_PROJECTED = "<all-fields-projected>";

@Override
public PhysicalPlan apply(PhysicalPlan plan) {
Expand All @@ -44,7 +46,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
// and the overall output will not change.
AttributeSet.Builder requiredAttrBuilder = plan.outputSet().asBuilder();

return plan.transformDown(currentPlanNode -> {
PhysicalPlan planAfter = plan.transformDown(currentPlanNode -> {
if (keepTraversing.get() == false) {
return currentPlanNode;
}
Expand Down Expand Up @@ -82,12 +84,13 @@ public PhysicalPlan apply(PhysicalPlan plan) {
// add a synthetic field (so it doesn't clash with the user defined one) to return a constant
// to avoid the block from being trimmed
if (output.isEmpty()) {
var alias = new Alias(logicalFragment.source(), "<all-fields-projected>", Literal.NULL, null, true);
var alias = new Alias(logicalFragment.source(), ALL_FIELDS_PROJECTED, Literal.NULL, null, true);
List<Alias> fields = singletonList(alias);
logicalFragment = new Eval(logicalFragment.source(), logicalFragment, fields);
output = Expressions.asAttributes(fields);
}
// add a logical projection (let the local replanning remove it if needed)
output = reorderOutput(logicalFragment.output(), output);
FragmentExec newChild = new FragmentExec(
Source.EMPTY,
new Project(logicalFragment.source(), logicalFragment, output),
Expand All @@ -105,5 +108,27 @@ public PhysicalPlan apply(PhysicalPlan plan) {
}
return currentPlanNode;
});
return planAfter;
}

private List<Attribute> reorderOutput(List<Attribute> expectedOrder, List<Attribute> attributesToSort) {
List<Attribute> output = new ArrayList<>(attributesToSort.size());
// Track which attributes have been added
var added = new HashSet<Attribute>();

// Add attributes in expectedOrder if present in attributesToSort
for (Attribute expected : expectedOrder) {
if (attributesToSort.contains(expected)) {
output.add(expected);
added.add(expected);
}
}
// Add remaining attributes from attributesToSort in their original order
for (Attribute attr : attributesToSort) {
if (added.contains(attr) == false) {
output.add(attr);
}
}
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected final ExecutionInfo executeWithInfo(TreeType plan) {
if (tf.hasChanged()) {
hasChanged = true;
if (log.isTraceEnabled()) {
log.trace("Rule {} applied\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
log.trace("Rule {} applied with change\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
}
} else {
if (log.isTraceEnabled()) {
Expand Down
Loading
Loading