Skip to content

Commit 40585a2

Browse files
Add checks that optimizers do not modify the layout (elastic#130855)
Add verification that the optimizers do not modify the number of attributes and the attribute datatype. We add special handling for Lookup Join, by checking EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP and another special handling for ProjectAwayColumns.ALL_FIELDS_PROJECTED Closes elastic#125576
1 parent c9d5076 commit 40585a2

File tree

15 files changed

+566
-61
lines changed

15 files changed

+566
-61
lines changed

docs/changelog/130855.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 130855
2+
summary: Add checks that optimizers do not modify the layout
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 125576

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,19 @@ public String nodeString() {
134134
}
135135

136136
protected abstract String label();
137+
138+
/**
139+
* Compares the size and datatypes of two lists of attributes for equality.
140+
*/
141+
public static boolean dataTypeEquals(List<Attribute> left, List<Attribute> right) {
142+
if (left.size() != right.size()) {
143+
return false;
144+
}
145+
for (int i = 0; i < left.size(); i++) {
146+
if (left.get(i).dataType() != right.get(i).dataType()) {
147+
return false;
148+
}
149+
}
150+
return true;
151+
}
137152
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private static Batch<LogicalPlan> localOperators() {
8888

8989
public LogicalPlan localOptimize(LogicalPlan plan) {
9090
LogicalPlan optimized = execute(plan);
91-
Failures failures = verifier.verify(optimized, true);
91+
Failures failures = verifier.verify(optimized, true, plan.output());
9292
if (failures.hasFailures()) {
9393
throw new VerificationException(failures);
9494
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
12+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1213
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1314
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
1415
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ParallelizeTimeSeriesSource;
@@ -42,15 +43,15 @@ public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
4243
}
4344

4445
public PhysicalPlan localOptimize(PhysicalPlan plan) {
45-
return verify(execute(plan));
46+
return verify(execute(plan), plan.output());
4647
}
4748

48-
PhysicalPlan verify(PhysicalPlan plan) {
49-
Failures failures = verifier.verify(plan, true);
49+
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
50+
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
5051
if (failures.hasFailures()) {
5152
throw new VerificationException(failures);
5253
}
53-
return plan;
54+
return optimizedPlan;
5455
}
5556

5657
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
113113
public LogicalPlan optimize(LogicalPlan verified) {
114114
var optimized = execute(verified);
115115

116-
Failures failures = verifier.verify(optimized, false);
116+
Failures failures = verifier.verify(optimized, false, verified.output());
117117
if (failures.hasFailures()) {
118118
throw new VerificationException(failures);
119119
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,28 @@
1313
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1414
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1515

16-
public final class LogicalVerifier {
16+
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {
1717

1818
public static final LogicalVerifier INSTANCE = new LogicalVerifier();
1919

2020
private LogicalVerifier() {}
2121

22-
/** Verifies the optimized logical plan. */
23-
public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
24-
Failures failures = new Failures();
25-
Failures dependencyFailures = new Failures();
26-
22+
@Override
23+
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
2724
if (skipRemoteEnrichVerification) {
2825
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
29-
var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
26+
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
3027
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
31-
return failures;
28+
return true;
3229
}
3330
}
31+
return false;
32+
}
3433

35-
plan.forEachUp(p -> {
36-
PlanConsistencyChecker.checkPlan(p, dependencyFailures);
34+
@Override
35+
void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures depFailures) {
36+
optimizedPlan.forEachUp(p -> {
37+
PlanConsistencyChecker.checkPlan(p, depFailures);
3738

3839
if (failures.hasFailures() == false) {
3940
if (p instanceof PostOptimizationVerificationAware pova) {
@@ -46,11 +47,5 @@ public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
4647
});
4748
}
4849
});
49-
50-
if (dependencyFailures.hasFailures()) {
51-
throw new IllegalStateException(dependencyFailures.toString());
52-
}
53-
54-
return failures;
5550
}
5651
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
12+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1213
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
1314
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1415
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -34,15 +35,15 @@ public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
3435
}
3536

3637
public PhysicalPlan optimize(PhysicalPlan plan) {
37-
return verify(execute(plan));
38+
return verify(execute(plan), plan.output());
3839
}
3940

40-
PhysicalPlan verify(PhysicalPlan plan) {
41-
Failures failures = verifier.verify(plan, false);
41+
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
42+
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
4243
if (failures.hasFailures()) {
4344
throw new VerificationException(failures);
4445
}
45-
return plan;
46+
return optimizedPlan;
4647
}
4748

4849
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,27 @@
2020
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2121

2222
/** Physical plan verifier. */
23-
public final class PhysicalVerifier {
23+
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {
2424

2525
public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
2626

2727
private PhysicalVerifier() {}
2828

29-
/** Verifies the physical plan. */
30-
public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
31-
Failures failures = new Failures();
32-
Failures depFailures = new Failures();
33-
29+
@Override
30+
boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
3431
if (skipRemoteEnrichVerification) {
3532
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
36-
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
33+
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
3734
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
38-
return failures;
35+
return true;
3936
}
4037
}
38+
return false;
39+
}
4140

42-
plan.forEachDown(p -> {
41+
@Override
42+
void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failures depFailures) {
43+
optimizedPlan.forEachDown(p -> {
4344
if (p instanceof FieldExtractExec fieldExtractExec) {
4445
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
4546
if (sourceAttribute == null) {
@@ -66,11 +67,5 @@ public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification)
6667
});
6768
}
6869
});
69-
70-
if (depFailures.hasFailures()) {
71-
throw new IllegalStateException(depFailures.toString());
72-
}
73-
74-
return failures;
7570
}
7671
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer;
9+
10+
import org.elasticsearch.xpack.esql.common.Failures;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
13+
import org.elasticsearch.xpack.esql.plan.QueryPlan;
14+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
15+
16+
import java.util.List;
17+
18+
import static org.elasticsearch.index.IndexMode.LOOKUP;
19+
import static org.elasticsearch.xpack.esql.common.Failure.fail;
20+
import static org.elasticsearch.xpack.esql.core.expression.Attribute.dataTypeEquals;
21+
22+
/**
23+
* Verifies the plan after optimization.
24+
* This is invoked immediately after a Plan Optimizer completes its work.
25+
* Currently, it is called after LogicalPlanOptimizer, PhysicalPlanOptimizer,
26+
* LocalLogicalPlanOptimizer, and LocalPhysicalPlanOptimizer.
27+
* Note: Logical and Physical optimizers may override methods in this class to perform different checks.
28+
*/
29+
public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {
30+
31+
/** Verifies the optimized plan */
32+
public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
33+
Failures failures = new Failures();
34+
Failures depFailures = new Failures();
35+
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
36+
return failures;
37+
}
38+
39+
checkPlanConsistency(optimizedPlan, failures, depFailures);
40+
41+
verifyOutputNotChanged(optimizedPlan, expectedOutputAttributes, failures);
42+
43+
if (depFailures.hasFailures()) {
44+
throw new IllegalStateException(depFailures.toString());
45+
}
46+
47+
return failures;
48+
}
49+
50+
abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);
51+
52+
abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);
53+
54+
private static void verifyOutputNotChanged(QueryPlan<?> optimizedPlan, List<Attribute> expectedOutputAttributes, Failures failures) {
55+
if (dataTypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
56+
// If the output level is empty we add a column called ProjectAwayColumns.ALL_FIELDS_PROJECTED
57+
// We will ignore such cases for output verification
58+
// TODO: this special casing is required due to https://github.com/elastic/elasticsearch/issues/121741, remove when fixed.
59+
boolean hasProjectAwayColumns = optimizedPlan.output()
60+
.stream()
61+
.anyMatch(x -> x.name().equals(ProjectAwayColumns.ALL_FIELDS_PROJECTED));
62+
// LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by
63+
// ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the
64+
// right hand side will only have the EsQueryExec providing the _doc attribute and nothing else.
65+
// We perform an optimizer run on every fragment. LookupJoinExec also contains such a fragment,
66+
// and currently it only contains an EsQueryExec after optimization.
67+
boolean hasLookupJoinExec = optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP;
68+
boolean ignoreError = hasProjectAwayColumns || hasLookupJoinExec;
69+
if (ignoreError == false) {
70+
failures.add(
71+
fail(
72+
optimizedPlan,
73+
"Output has changed from [{}] to [{}]. ",
74+
expectedOutputAttributes.toString(),
75+
optimizedPlan.output().toString()
76+
)
77+
);
78+
}
79+
}
80+
}
81+
82+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* extraction.
3737
*/
3838
public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
39+
public static String ALL_FIELDS_PROJECTED = "<all-fields-projected>";
3940

4041
@Override
4142
public PhysicalPlan apply(PhysicalPlan plan) {
@@ -94,7 +95,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
9495
// add a synthetic field (so it doesn't clash with the user defined one) to return a constant
9596
// to avoid the block from being trimmed
9697
if (output.isEmpty()) {
97-
var alias = new Alias(logicalFragment.source(), "<all-fields-projected>", Literal.NULL, null, true);
98+
var alias = new Alias(logicalFragment.source(), ALL_FIELDS_PROJECTED, Literal.NULL, null, true);
9899
List<Alias> fields = singletonList(alias);
99100
logicalFragment = new Eval(logicalFragment.source(), logicalFragment, fields);
100101
output = Expressions.asAttributes(fields);

0 commit comments

Comments
 (0)