Skip to content

Commit 2be8b67

Browse files
authored
ESQL: Enable physical plan verification (#118114)
This enables the physical plan verification. For it, a couple of changes needed to be applied/corrected: * AggregateMapper creates attributes with unique names; * AggregateExec's verification needs not consider ordinal attribute(s); * LookupJoinExec needs to merge attributes of same name at output, "winning" the right child; * ExchangeExec does no input referencing, since it only outputs all synthetic attributes, "sourced" from remote exchanges; * FieldExtractExec doesn't reference the attributes it "produces".
1 parent afd88cc commit 2be8b67

File tree

14 files changed

+164
-75
lines changed

14 files changed

+164
-75
lines changed

docs/changelog/118114.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118114
2+
summary: Enable physical plan verification
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ public Attribute(Source source, String name, Nullability nullability, @Nullable
4949
this.nullability = nullability;
5050
}
5151

52-
public static String rawTemporaryName(String inner, String outer, String suffix) {
53-
return SYNTHETIC_ATTRIBUTE_NAME_PREFIX + inner + "$" + outer + "$" + suffix;
52+
public static String rawTemporaryName(String... parts) {
53+
var name = String.join("$", parts);
54+
return name.isEmpty() || name.startsWith(SYNTHETIC_ATTRIBUTE_NAME_PREFIX) ? name : SYNTHETIC_ATTRIBUTE_NAME_PREFIX + name;
5455
}
5556

5657
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected List<Batch<PhysicalPlan>> batches() {
5757
}
5858

5959
protected List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
60-
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(4);
60+
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(6);
6161
esSourceRules.add(new ReplaceSourceAttributes());
6262

6363
if (optimizeForEsSource) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.common.Failure;
11+
import org.elasticsearch.xpack.esql.common.Failures;
1112
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1213
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1314
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
15+
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
1416
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
1517
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1618

@@ -31,10 +33,14 @@ private PhysicalVerifier() {}
3133
/** Verifies the physical plan. */
3234
public Collection<Failure> verify(PhysicalPlan plan) {
3335
Set<Failure> failures = new LinkedHashSet<>();
36+
Failures depFailures = new Failures();
3437

3538
plan.forEachDown(p -> {
36-
// FIXME: re-enable
37-
// DEPENDENCY_CHECK.checkPlan(p, failures);
39+
if (p instanceof AggregateExec agg) {
40+
var exclude = Expressions.references(agg.ordinalAttributes());
41+
DEPENDENCY_CHECK.checkPlan(p, exclude, depFailures);
42+
return;
43+
}
3844
if (p instanceof FieldExtractExec fieldExtractExec) {
3945
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
4046
if (sourceAttribute == null) {
@@ -48,8 +54,13 @@ public Collection<Failure> verify(PhysicalPlan plan) {
4854
);
4955
}
5056
}
57+
DEPENDENCY_CHECK.checkPlan(p, depFailures);
5158
});
5259

60+
if (depFailures.hasFailures()) {
61+
throw new IllegalStateException(depFailures.toString());
62+
}
63+
5364
return failures;
5465
}
5566
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@ public class PlanConsistencyChecker<P extends QueryPlan<P>> {
2626
* {@link org.elasticsearch.xpack.esql.common.Failure Failure}s to the {@link Failures} object.
2727
*/
2828
public void checkPlan(P p, Failures failures) {
29+
checkPlan(p, AttributeSet.EMPTY, failures);
30+
}
31+
32+
public void checkPlan(P p, AttributeSet exclude, Failures failures) {
2933
AttributeSet refs = p.references();
3034
AttributeSet input = p.inputSet();
31-
AttributeSet missing = refs.subtract(input);
35+
AttributeSet missing = refs.subtract(input).subtract(exclude);
3236
// TODO: for Joins, we should probably check if the required fields from the left child are actually in the left child, not
3337
// just any child (and analogously for the right child).
3438
if (missing.isEmpty() == false) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1212
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1313
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
14-
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
1514
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
1615
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
1716
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
@@ -22,7 +21,6 @@
2221

2322
import java.util.ArrayList;
2423
import java.util.LinkedHashSet;
25-
import java.util.LinkedList;
2624
import java.util.List;
2725
import java.util.Set;
2826

@@ -54,18 +52,9 @@ public PhysicalPlan apply(PhysicalPlan plan) {
5452
* it loads the field lazily. If we have more than one field we need to
5553
* make sure the fields are loaded for the standard hash aggregator.
5654
*/
57-
if (p instanceof AggregateExec agg && agg.groupings().size() == 1) {
58-
// CATEGORIZE requires the standard hash aggregator as well.
59-
if (agg.groupings().get(0).anyMatch(e -> e instanceof Categorize) == false) {
60-
var leaves = new LinkedList<>();
61-
// TODO: this seems out of place
62-
agg.aggregates()
63-
.stream()
64-
.filter(a -> agg.groupings().contains(a) == false)
65-
.forEach(a -> leaves.addAll(a.collectLeaves()));
66-
var remove = agg.groupings().stream().filter(g -> leaves.contains(g) == false).toList();
67-
missing.removeAll(Expressions.references(remove));
68-
}
55+
if (p instanceof AggregateExec agg) {
56+
var ordinalAttributes = agg.ordinalAttributes();
57+
missing.removeAll(Expressions.references(ordinalAttributes));
6958
}
7059

7160
// add extractor

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1919
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2020
import org.elasticsearch.xpack.esql.core.tree.Source;
21+
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
2122
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2223
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2324

2425
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.HashSet;
2528
import java.util.List;
2629
import java.util.Objects;
2730

@@ -184,6 +187,24 @@ protected AttributeSet computeReferences() {
184187
return mode.isInputPartial() ? new AttributeSet(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings);
185188
}
186189

190+
/** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */
191+
public List<Attribute> ordinalAttributes() {
192+
List<Attribute> orginalAttributs = new ArrayList<>(groupings.size());
193+
// Ordinals can be leveraged just for a single grouping. If there are multiple groupings, fields need to be laoded for the
194+
// hash aggregator.
195+
// CATEGORIZE requires the standard hash aggregator as well.
196+
if (groupings().size() == 1 && groupings.get(0).anyMatch(e -> e instanceof Categorize) == false) {
197+
var leaves = new HashSet<>();
198+
aggregates.stream().filter(a -> groupings.contains(a) == false).forEach(a -> leaves.addAll(a.collectLeaves()));
199+
groupings.forEach(g -> {
200+
if (leaves.contains(g) == false) {
201+
orginalAttributs.add((Attribute) g);
202+
}
203+
});
204+
}
205+
return orginalAttributs;
206+
}
207+
187208
@Override
188209
public int hashCode() {
189210
return Objects.hash(groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, child());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeExec.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
14+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1415
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1516
import org.elasticsearch.xpack.esql.core.tree.Source;
1617
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
@@ -72,6 +73,12 @@ public boolean inBetweenAggs() {
7273
return inBetweenAggs;
7374
}
7475

76+
@Override
77+
protected AttributeSet computeReferences() {
78+
// ExchangeExec does no input referencing, it only outputs all synthetic attributes, "sourced" from remote exchanges.
79+
return AttributeSet.EMPTY;
80+
}
81+
7582
@Override
7683
public UnaryExec replaceChild(PhysicalPlan newChild) {
7784
return new ExchangeExec(source(), output, inBetweenAggs, newChild);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FieldExtractExec.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,7 @@ public static Attribute extractSourceAttributesFrom(PhysicalPlan plan) {
8989

9090
@Override
9191
protected AttributeSet computeReferences() {
92-
AttributeSet required = new AttributeSet(docValuesAttributes);
93-
94-
required.add(sourceAttribute);
95-
required.addAll(attributesToExtract);
96-
97-
return required;
92+
return sourceAttribute != null ? new AttributeSet(sourceAttribute) : AttributeSet.EMPTY;
9893
}
9994

10095
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ public List<Attribute> addedFields() {
9393
public List<Attribute> output() {
9494
if (lazyOutput == null) {
9595
lazyOutput = new ArrayList<>(left().output());
96-
for (Attribute attr : addedFields) {
97-
lazyOutput.add(attr);
98-
}
96+
var addedFieldsNames = addedFields.stream().map(Attribute::name).toList();
97+
lazyOutput.removeIf(a -> addedFieldsNames.contains(a.name()));
98+
lazyOutput.addAll(addedFields);
9999
}
100100
return lazyOutput;
101101
}

0 commit comments

Comments
 (0)