Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130409
summary: Add Dependency Checker for `LogicalLocalPlanOptimizer`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch;
Expand Down Expand Up @@ -35,6 +37,8 @@
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;

private static final List<Batch<LogicalPlan>> RULES = arrayAsArrayList(
new Batch<>(
"Local rewrite",
Expand Down Expand Up @@ -81,6 +85,12 @@ private static Batch<LogicalPlan> localOperators() {
}

public LogicalPlan localOptimize(LogicalPlan plan) {
return execute(plan);
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return optimized;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan);
Failures failures = verifier.verify(plan, true);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized);
Failures failures = verifier.verify(optimized, false);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

public final class LogicalVerifier {
Expand All @@ -19,10 +20,18 @@ public final class LogicalVerifier {
private LogicalVerifier() {}

/** Verifies the optimized logical plan. */
public Failures verify(LogicalPlan plan) {
public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures dependencyFailures = new Failures();

if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}
}

plan.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, dependencyFailures);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan);
Failures failures = verifier.verify(plan, false);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ public final class PhysicalVerifier {
private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Failures verify(PhysicalPlan plan) {
public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
Failures failures = new Failures();
Failures depFailures = new Failures();

// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
if (skipRemoteEnrichVerification) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think when we talked offline I accidentally suggested that we should skip the whole verification in case of remote enriches in a local plan; it should indeed be sufficient to just skip line with PlanConsistencyChecker.checkPlan below.

But this is fine, too, and this PR is already a great improvement over what we had before!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave this comment open and handle it in a future PR.

// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}
}

plan.forEachDown(p -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.type.InvalidMappedField;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.StartsWith;
Expand All @@ -49,6 +52,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
Expand All @@ -64,6 +68,7 @@
import java.util.Map;
import java.util.Set;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.ONE;
Expand All @@ -84,6 +89,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -774,6 +780,32 @@ public void testGroupingByMissingFields() {
as(eval.child(), EsRelation.class);
}

public void testPlanSanityCheck() throws Exception {
var plan = localPlan("""
from test
| stats a = min(salary) by emp_no
""");

var limit = as(plan, Limit.class);
var aggregate = as(limit.child(), Aggregate.class);
var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
var salary = as(min.field(), NamedExpression.class);
assertThat(salary.name(), is("salary"));
// emulate a rule that adds an invalid field
var invalidPlan = new OrderBy(
limit.source(),
limit,
asList(new Order(limit.source(), salary, Order.OrderDirection.ASC, Order.NullsPosition.FIRST))
);

var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), TEST_SEARCH_STATS);
LocalLogicalPlanOptimizer localLogicalPlanOptimizer = new LocalLogicalPlanOptimizer(localContext);

IllegalStateException e = expectThrows(IllegalStateException.class, () -> localLogicalPlanOptimizer.localOptimize(invalidPlan));
assertThat(e.getMessage(), containsString("Plan [OrderBy[[Order[salary"));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [salary"));
}

private IsNotNull isNotNull(Expression field) {
return new IsNotNull(EMPTY, field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
Expand All @@ -52,9 +53,11 @@
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
import org.elasticsearch.xpack.esql.expression.function.fulltext.Kql;
import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
Expand Down Expand Up @@ -131,8 +134,12 @@
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.indexWithDateDateNanosUnionType;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS;
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
import static org.elasticsearch.xpack.esql.core.util.TestUtils.getFieldAttribute;
import static org.elasticsearch.xpack.esql.plan.physical.AbstractPhysicalPlanSerializationTests.randomEstimatedRowSize;
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -2056,6 +2063,37 @@ public void testToDateNanosPushDown() {
assertThat(expected.toString(), is(esQuery.query().toString()));
}

public void testVerifierOnMissingReferences() throws Exception {

PhysicalPlan plan = plannerOptimizer.plan("""
from test
| stats a = min(salary) by emp_no
""");

var limit = as(plan, LimitExec.class);
var aggregate = as(limit.child(), AggregateExec.class);
var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
var salary = as(min.field(), NamedExpression.class);
assertThat(salary.name(), is("salary"));
// emulate a rule that adds a missing attribute
FieldAttribute missingAttr = getFieldAttribute("missing attr");
List<Order> orders = List.of(new Order(plan.source(), missingAttr, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
TopNExec topNExec = new TopNExec(plan.source(), plan, orders, new Literal(Source.EMPTY, limit, INTEGER), randomEstimatedRowSize());

// We want to verify that the localOptimize detects the missing attribute.
// However, it also throws an error in one of the rules before we get to the verifier.
// So we use an implementation of LocalPhysicalPlanOptimizer that does not have any rules.
LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext(config, FoldContext.small(), SearchStats.EMPTY);
LocalPhysicalPlanOptimizer optimizerWithNoopExecute = new LocalPhysicalPlanOptimizer(context) {
@Override
protected List<Batch<PhysicalPlan>> batches() {
return List.of();
}
};
Exception e = expectThrows(IllegalStateException.class, () -> optimizerWithNoopExecute.localOptimize(topNExec));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [missing attr"));
}

private boolean isMultiTypeEsField(Expression e) {
return e instanceof FieldAttribute fa && fa.field() instanceof MultiTypeEsField;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5554,7 +5554,7 @@ public void testPushShadowingGeneratingPlanPastProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5605,7 +5605,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5661,7 +5661,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() {

// This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
// been propagated into the generating plan.
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down
Loading