Skip to content

Commit da96ebd

Browse files
Fix more cases and address code review
1 parent 6f29114 commit da96ebd

File tree

7 files changed

+288
-44
lines changed

7 files changed

+288
-44
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public String nodeString() {
138138
/**
139139
* Compares the size and datatypes of two lists of attributes for equality.
140140
*/
141-
public static boolean datatypeEquals(List<Attribute> left, List<Attribute> right) {
141+
public static boolean dataTypeEquals(List<Attribute> left, List<Attribute> right) {
142142
if (left.size() != right.size()) {
143143
return false;
144144
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@
1010
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
13-
import org.elasticsearch.xpack.esql.plan.QueryPlan;
1413
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
14+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1515

16-
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier {
16+
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {
1717

1818
public static final LogicalVerifier INSTANCE = new LogicalVerifier();
1919

2020
private LogicalVerifier() {}
2121

2222
@Override
23-
boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification) {
23+
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
2424
if (skipRemoteEnrichVerification) {
2525
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
2626
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
@@ -32,7 +32,7 @@ boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVer
3232
}
3333

3434
@Override
35-
void checkPlanConsistency(QueryPlan<?> optimizedPlan, Failures failures, Failures depFailures) {
35+
void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures depFailures) {
3636
optimizedPlan.forEachUp(p -> {
3737
PlanConsistencyChecker.checkPlan(p, depFailures);
3838

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@
1212
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1313
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1414
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
15-
import org.elasticsearch.xpack.esql.plan.QueryPlan;
1615
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1716
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
1817
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
18+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1919

2020
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2121

2222
/** Physical plan verifier. */
23-
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier {
23+
public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {
2424

2525
public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
2626

2727
private PhysicalVerifier() {}
2828

2929
@Override
30-
boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification) {
30+
boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
3131
if (skipRemoteEnrichVerification) {
3232
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
3333
var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
@@ -39,7 +39,7 @@ boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVer
3939
}
4040

4141
@Override
42-
void checkPlanConsistency(QueryPlan<?> optimizedPlan, Failures failures, Failures depFailures) {
42+
void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failures depFailures) {
4343
optimizedPlan.forEachDown(p -> {
4444
if (p instanceof FieldExtractExec fieldExtractExec) {
4545
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99

1010
import org.elasticsearch.xpack.esql.common.Failures;
1111
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
1213
import org.elasticsearch.xpack.esql.plan.QueryPlan;
1314
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
1415

1516
import java.util.List;
1617

1718
import static org.elasticsearch.index.IndexMode.LOOKUP;
1819
import static org.elasticsearch.xpack.esql.common.Failure.fail;
19-
import static org.elasticsearch.xpack.esql.core.expression.Attribute.datatypeEquals;
20+
import static org.elasticsearch.xpack.esql.core.expression.Attribute.dataTypeEquals;
2021

2122
/**
2223
* Verifies the plan after optimization.
@@ -25,10 +26,10 @@
2526
* LocalLogicalPlanOptimizer, and LocalPhysicalPlanOptimizer.
2627
* Note: Logical and Physical optimizers may override methods in this class to perform different checks.
2728
*/
28-
public abstract class PostOptimizationPhasePlanVerifier {
29+
public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {
2930

3031
/** Verifies the optimized plan */
31-
public Failures verify(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
32+
public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
3233
Failures failures = new Failures();
3334
Failures depFailures = new Failures();
3435
if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
@@ -46,16 +47,25 @@ public Failures verify(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerif
4647
return failures;
4748
}
4849

49-
abstract boolean skipVerification(QueryPlan<?> optimizedPlan, boolean skipRemoteEnrichVerification);
50+
abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);
5051

51-
abstract void checkPlanConsistency(QueryPlan<?> optimizedPlan, Failures failures, Failures depFailures);
52+
abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);
5253

5354
private static void verifyOutputNotChanged(QueryPlan<?> optimizedPlan, List<Attribute> expectedOutputAttributes, Failures failures) {
54-
if (datatypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
55+
if (dataTypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
56+
// If the output level is empty we add a column called ProjectAwayColumns.ALL_FIELDS_PROJECTED
57+
// We will ignore such cases for output verification
58+
boolean hasProjectAwayColumns = optimizedPlan.output()
59+
.stream()
60+
.anyMatch(x -> x.name().equals(ProjectAwayColumns.ALL_FIELDS_PROJECTED));
5561
// LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by
5662
// ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the
5763
// right hand side will only have the EsQueryExec providing the _doc attribute and nothing else.
58-
if ((optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP) == false) {
64+
// We perform an optimizer run on every fragment. LookupJoinExec also contains such a fragment,
65+
// and currently it only contains an EsQueryExec after optimization.
66+
boolean hasLookupJoinExec = optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP;
67+
boolean ignoreError = hasProjectAwayColumns || hasLookupJoinExec;
68+
if (ignoreError == false) {
5969
failures.add(
6070
fail(
6171
optimizedPlan,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.esql.rule.Rule;
2525

2626
import java.util.ArrayList;
27+
import java.util.HashSet;
2728
import java.util.List;
2829

2930
import static java.lang.Boolean.FALSE;
@@ -36,6 +37,7 @@
3637
* extraction.
3738
*/
3839
public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
40+
public static String ALL_FIELDS_PROJECTED = "<all-fields-projected>";
3941

4042
@Override
4143
public PhysicalPlan apply(PhysicalPlan plan) {
@@ -44,7 +46,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
4446
// and the overall output will not change.
4547
AttributeSet.Builder requiredAttrBuilder = plan.outputSet().asBuilder();
4648

47-
return plan.transformDown(currentPlanNode -> {
49+
PhysicalPlan planAfter = plan.transformDown(currentPlanNode -> {
4850
if (keepTraversing.get() == false) {
4951
return currentPlanNode;
5052
}
@@ -82,12 +84,13 @@ public PhysicalPlan apply(PhysicalPlan plan) {
8284
// add a synthetic field (so it doesn't clash with the user defined one) to return a constant
8385
// to avoid the block from being trimmed
8486
if (output.isEmpty()) {
85-
var alias = new Alias(logicalFragment.source(), "<all-fields-projected>", Literal.NULL, null, true);
87+
var alias = new Alias(logicalFragment.source(), ALL_FIELDS_PROJECTED, Literal.NULL, null, true);
8688
List<Alias> fields = singletonList(alias);
8789
logicalFragment = new Eval(logicalFragment.source(), logicalFragment, fields);
8890
output = Expressions.asAttributes(fields);
8991
}
9092
// add a logical projection (let the local replanning remove it if needed)
93+
output = reorderOutput(logicalFragment.output(), output);
9194
FragmentExec newChild = new FragmentExec(
9295
Source.EMPTY,
9396
new Project(logicalFragment.source(), logicalFragment, output),
@@ -105,5 +108,27 @@ public PhysicalPlan apply(PhysicalPlan plan) {
105108
}
106109
return currentPlanNode;
107110
});
111+
return planAfter;
112+
}
113+
114+
private List<Attribute> reorderOutput(List<Attribute> expectedOrder, List<Attribute> attributesToSort) {
115+
List<Attribute> output = new ArrayList<>(attributesToSort.size());
116+
// Track which attributes have been added
117+
var added = new HashSet<Attribute>();
118+
119+
// Add attributes in expectedOrder if present in attributesToSort
120+
for (Attribute expected : expectedOrder) {
121+
if (attributesToSort.contains(expected)) {
122+
output.add(expected);
123+
added.add(expected);
124+
}
125+
}
126+
// Add remaining attributes from attributesToSort in their original order
127+
for (Attribute attr : attributesToSort) {
128+
if (added.contains(attr) == false) {
129+
output.add(attr);
130+
}
131+
}
132+
return output;
108133
}
109134
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@
131131
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
132132
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
133133
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
134+
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
134135

135136
import java.time.Duration;
136137
import java.util.ArrayList;
@@ -178,6 +179,8 @@
178179
import static org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison.BinaryComparisonOperation.GTE;
179180
import static org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison.BinaryComparisonOperation.LT;
180181
import static org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison.BinaryComparisonOperation.LTE;
182+
import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.DOWN;
183+
import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.UP;
181184
import static org.hamcrest.Matchers.allOf;
182185
import static org.hamcrest.Matchers.anyOf;
183186
import static org.hamcrest.Matchers.contains;
@@ -7855,4 +7858,101 @@ public void testSampleNoPushDownChangePoint() {
78557858
var topN = as(changePoint.child(), TopN.class);
78567859
var source = as(topN.child(), EsRelation.class);
78577860
}
7861+
7862+
private LogicalPlanOptimizer getCustomRulesLogicalPlanOptimizer(List<RuleExecutor.Batch<LogicalPlan>> batches) {
7863+
LogicalOptimizerContext context = new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small());
7864+
LogicalPlanOptimizer customOptimizer = new LogicalPlanOptimizer(context) {
7865+
@Override
7866+
protected List<Batch<LogicalPlan>> batches() {
7867+
return batches;
7868+
}
7869+
};
7870+
return customOptimizer;
7871+
}
7872+
7873+
public void testVerifierOnAdditionalAttributeAdded() throws Exception {
7874+
var plan = optimizedPlan("""
7875+
from test
7876+
| stats a = min(salary) by emp_no
7877+
""");
7878+
7879+
var limit = as(plan, Limit.class);
7880+
var aggregate = as(limit.child(), Aggregate.class);
7881+
var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
7882+
var salary = as(min.field(), NamedExpression.class);
7883+
assertThat(salary.name(), is("salary"));
7884+
7885+
// use a custom rule that adds another output attribute
7886+
var customRuleBatch = new RuleExecutor.Batch<>(
7887+
"CustomRuleBatch",
7888+
RuleExecutor.Limiter.ONCE,
7889+
new OptimizerRules.ParameterizedOptimizerRule<Aggregate, LogicalOptimizerContext>(UP) {
7890+
static Integer appliedCount = 0;
7891+
7892+
@Override
7893+
protected LogicalPlan rule(Aggregate plan, LogicalOptimizerContext context) {
7894+
// This rule adds a missing attribute to the plan output
7895+
// We only want to apply it once, so we use a static counter
7896+
if (appliedCount == 0) {
7897+
appliedCount++;
7898+
Literal additionalLiteral = new Literal(Source.EMPTY, "additional literal", INTEGER);
7899+
return new Eval(plan.source(), plan, List.of(new Alias(Source.EMPTY, "additionalAttribute", additionalLiteral)));
7900+
}
7901+
return plan;
7902+
}
7903+
7904+
}
7905+
);
7906+
LogicalPlanOptimizer customRulesLogicalPlanOptimizer = getCustomRulesLogicalPlanOptimizer(List.of(customRuleBatch));
7907+
Exception e = expectThrows(VerificationException.class, () -> customRulesLogicalPlanOptimizer.optimize(plan));
7908+
assertThat(e.getMessage(), containsString("Output has changed from"));
7909+
assertThat(e.getMessage(), containsString("additionalAttribute"));
7910+
}
7911+
7912+
public void testVerifierOnAttributeDatatypeChanged() {
7913+
var plan = optimizedPlan("""
7914+
from test
7915+
| stats a = min(salary) by emp_no
7916+
""");
7917+
7918+
var limit = as(plan, Limit.class);
7919+
var aggregate = as(limit.child(), Aggregate.class);
7920+
var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
7921+
var salary = as(min.field(), NamedExpression.class);
7922+
assertThat(salary.name(), is("salary"));
7923+
7924+
// use a custom rule that changes the datatype of an output attribute
7925+
var customRuleBatch = new RuleExecutor.Batch<>(
7926+
"CustomRuleBatch",
7927+
RuleExecutor.Limiter.ONCE,
7928+
new OptimizerRules.ParameterizedOptimizerRule<LogicalPlan, LogicalOptimizerContext>(DOWN) {
7929+
static Integer appliedCount = 0;
7930+
7931+
@Override
7932+
protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) {
7933+
// We only want to apply it once, so we use a static counter
7934+
if (appliedCount == 0) {
7935+
appliedCount++;
7936+
Limit limit = as(plan, Limit.class);
7937+
Limit newLimit = new Limit(plan.source(), limit.limit(), limit.child()) {
7938+
@Override
7939+
public List<Attribute> output() {
7940+
List<Attribute> oldOutput = super.output();
7941+
List<Attribute> newOutput = new ArrayList<>(oldOutput);
7942+
newOutput.set(0, oldOutput.get(0).withDataType(DataType.DATETIME));
7943+
return newOutput;
7944+
}
7945+
};
7946+
return newLimit;
7947+
}
7948+
return plan;
7949+
}
7950+
7951+
}
7952+
);
7953+
LogicalPlanOptimizer customRulesLogicalPlanOptimizer = getCustomRulesLogicalPlanOptimizer(List.of(customRuleBatch));
7954+
Exception e = expectThrows(VerificationException.class, () -> customRulesLogicalPlanOptimizer.optimize(plan));
7955+
assertThat(e.getMessage(), containsString("Output has changed from"));
7956+
}
7957+
78587958
}

0 commit comments

Comments
 (0)