Skip to content

Commit 998e0e0

Browse files
alex-spiesMax Hniebergall
authored andcommitted
ESQL: Dependency check for binary plans (#118326) (#118654)
Make the dependency checker for query plans take into account binary plans and make sure that fields required from the left hand side are actually obtained from there (and analogously for the right).
1 parent bfdc7af commit 998e0e0

File tree

13 files changed

+195
-28
lines changed

13 files changed

+195
-28
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/EsIndex.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public class EsIndex implements Writeable {
2525
private final Map<String, EsField> mapping;
2626
private final Map<String, IndexMode> indexNameWithModes;
2727

28+
/**
29+
* Intended for tests. Returns an index with an empty index mode map.
30+
*/
2831
public EsIndex(String name, Map<String, EsField> mapping) {
2932
this(name, mapping, Map.of());
3033
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
public final class LogicalVerifier {
1616

17-
private static final PlanConsistencyChecker<LogicalPlan> DEPENDENCY_CHECK = new PlanConsistencyChecker<>();
1817
public static final LogicalVerifier INSTANCE = new LogicalVerifier();
1918

2019
private LogicalVerifier() {}
@@ -25,7 +24,7 @@ public Failures verify(LogicalPlan plan) {
2524
Failures dependencyFailures = new Failures();
2625

2726
plan.forEachUp(p -> {
28-
DEPENDENCY_CHECK.checkPlan(p, dependencyFailures);
27+
PlanConsistencyChecker.checkPlan(p, dependencyFailures);
2928

3029
if (failures.hasFailures() == false) {
3130
p.forEachExpression(ex -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1414
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
1515
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
16-
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
1716
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
1817
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
1918
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -28,7 +27,6 @@
2827
public final class PhysicalVerifier {
2928

3029
public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
31-
private static final PlanConsistencyChecker<PhysicalPlan> DEPENDENCY_CHECK = new PlanConsistencyChecker<>();
3230

3331
private PhysicalVerifier() {}
3432

@@ -44,11 +42,6 @@ public Collection<Failure> verify(PhysicalPlan plan) {
4442
}
4543

4644
plan.forEachDown(p -> {
47-
if (p instanceof AggregateExec agg) {
48-
var exclude = Expressions.references(agg.ordinalAttributes());
49-
DEPENDENCY_CHECK.checkPlan(p, exclude, depFailures);
50-
return;
51-
}
5245
if (p instanceof FieldExtractExec fieldExtractExec) {
5346
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
5447
if (sourceAttribute == null) {
@@ -62,7 +55,7 @@ public Collection<Failure> verify(PhysicalPlan plan) {
6255
);
6356
}
6457
}
65-
DEPENDENCY_CHECK.checkPlan(p, depFailures);
58+
PlanConsistencyChecker.checkPlan(p, depFailures);
6659
});
6760

6861
if (depFailures.hasFailures()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,42 @@
1212
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1313
import org.elasticsearch.xpack.esql.core.expression.NameId;
1414
import org.elasticsearch.xpack.esql.plan.QueryPlan;
15+
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
16+
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
1517

1618
import java.util.HashSet;
1719
import java.util.Set;
1820

1921
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2022

21-
public class PlanConsistencyChecker<P extends QueryPlan<P>> {
23+
public class PlanConsistencyChecker {
2224

2325
/**
2426
* Check whether a single {@link QueryPlan} produces no duplicate attributes and its children provide all of its required
2527
* {@link QueryPlan#references() references}. Otherwise, add
2628
* {@link org.elasticsearch.xpack.esql.common.Failure Failure}s to the {@link Failures} object.
2729
*/
28-
public void checkPlan(P p, Failures failures) {
29-
checkPlan(p, AttributeSet.EMPTY, failures);
30-
}
31-
32-
public void checkPlan(P p, AttributeSet exclude, Failures failures) {
33-
AttributeSet refs = p.references();
34-
AttributeSet input = p.inputSet();
35-
AttributeSet missing = refs.subtract(input).subtract(exclude);
36-
// TODO: for Joins, we should probably check if the required fields from the left child are actually in the left child, not
37-
// just any child (and analogously for the right child).
38-
if (missing.isEmpty() == false) {
39-
failures.add(fail(p, "Plan [{}] optimized incorrectly due to missing references {}", p.nodeString(), missing));
30+
public static void checkPlan(QueryPlan<?> p, Failures failures) {
31+
if (p instanceof BinaryPlan binaryPlan) {
32+
checkMissingBinary(
33+
p,
34+
binaryPlan.leftReferences(),
35+
binaryPlan.left().outputSet(),
36+
binaryPlan.rightReferences(),
37+
binaryPlan.right().outputSet(),
38+
failures
39+
);
40+
} else if (p instanceof BinaryExec binaryExec) {
41+
checkMissingBinary(
42+
p,
43+
binaryExec.leftReferences(),
44+
binaryExec.left().outputSet(),
45+
binaryExec.rightReferences(),
46+
binaryExec.right().outputSet(),
47+
failures
48+
);
49+
} else {
50+
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
4051
}
4152

4253
Set<String> outputAttributeNames = new HashSet<>();
@@ -49,4 +60,29 @@ public void checkPlan(P p, AttributeSet exclude, Failures failures) {
4960
}
5061
}
5162
}
63+
64+
private static void checkMissingBinary(
65+
QueryPlan<?> plan,
66+
AttributeSet leftReferences,
67+
AttributeSet leftInput,
68+
AttributeSet rightReferences,
69+
AttributeSet rightInput,
70+
Failures failures
71+
) {
72+
checkMissing(plan, leftReferences, leftInput, "missing references from left hand side", failures);
73+
checkMissing(plan, rightReferences, rightInput, "missing references from right hand side", failures);
74+
}
75+
76+
private static void checkMissing(
77+
QueryPlan<?> plan,
78+
AttributeSet references,
79+
AttributeSet input,
80+
String detailErrorMessage,
81+
Failures failures
82+
) {
83+
AttributeSet missing = references.subtract(input);
84+
if (missing.isEmpty() == false) {
85+
failures.add(fail(plan, "Plan [{}] optimized incorrectly due to {} {}", plan.nodeString(), detailErrorMessage, missing));
86+
}
87+
}
5288
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/BinaryPlan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.esql.plan.logical;
88

9+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
910
import org.elasticsearch.xpack.esql.core.tree.Source;
1011

1112
import java.util.Arrays;
@@ -30,6 +31,10 @@ public LogicalPlan right() {
3031
return right;
3132
}
3233

34+
public abstract AttributeSet leftReferences();
35+
36+
public abstract AttributeSet rightReferences();
37+
3338
@Override
3439
public final BinaryPlan replaceChildren(List<LogicalPlan> newChildren) {
3540
return replaceChildren(newChildren.get(0), newChildren.get(1));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
15+
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1516
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1617
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1718
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -97,6 +98,16 @@ public List<Attribute> output() {
9798
return lazyOutput;
9899
}
99100

101+
@Override
102+
public AttributeSet leftReferences() {
103+
return Expressions.references(config().leftFields());
104+
}
105+
106+
@Override
107+
public AttributeSet rightReferences() {
108+
return Expressions.references(config().rightFields());
109+
}
110+
100111
public List<Attribute> rightOutputFields() {
101112
AttributeSet leftInputs = left().outputSet();
102113

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ public List<Attribute> output() {
184184

185185
@Override
186186
protected AttributeSet computeReferences() {
187-
return mode.isInputPartial() ? new AttributeSet(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings);
187+
return mode.isInputPartial()
188+
? new AttributeSet(intermediateAttributes)
189+
: Aggregate.computeReferences(aggregates, groupings).subtract(new AttributeSet(ordinalAttributes()));
188190
}
189191

190192
/** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/BinaryExec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.plan.physical;
99

1010
import org.elasticsearch.common.io.stream.StreamOutput;
11+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1112
import org.elasticsearch.xpack.esql.core.tree.Source;
1213

1314
import java.io.IOException;
@@ -40,6 +41,10 @@ public PhysicalPlan right() {
4041
return right;
4142
}
4243

44+
public abstract AttributeSet leftReferences();
45+
46+
public abstract AttributeSet rightReferences();
47+
4348
@Override
4449
public void writeTo(StreamOutput out) throws IOException {
4550
Source.EMPTY.writeTo(out);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,16 @@ protected AttributeSet computeReferences() {
119119
return Expressions.references(leftFields);
120120
}
121121

122+
@Override
123+
public AttributeSet leftReferences() {
124+
return Expressions.references(leftFields);
125+
}
126+
127+
@Override
128+
public AttributeSet rightReferences() {
129+
return Expressions.references(rightFields);
130+
}
131+
122132
@Override
123133
public HashJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
124134
return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,21 @@ protected AttributeSet computeReferences() {
119119
return Expressions.references(leftFields);
120120
}
121121

122+
@Override
123+
public AttributeSet leftReferences() {
124+
return Expressions.references(leftFields);
125+
}
126+
127+
@Override
128+
public AttributeSet rightReferences() {
129+
// TODO: currently it's hard coded that we add all fields from the lookup index. But the output we "officially" get from the right
130+
// hand side is inconsistent:
131+
// - After logical optimization, there's a FragmentExec with an EsRelation on the right hand side with all the fields.
132+
// - After local physical optimization, there's just an EsQueryExec here, with no fields other than _doc mentioned and we don't
133+
// insert field extractions in the plan, either.
134+
return AttributeSet.EMPTY;
135+
}
136+
122137
@Override
123138
public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
124139
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields);

0 commit comments

Comments
 (0)