Skip to content

Commit 2f0010c

Browse files
authored
ESQL: Fix planning of dangling Project with InlineJoin (#132992)
This fixes a planning error leaving a Project on top of an empty LocalRelation marker unpruned. The empty EmptyRelation is added in the plan as a marker, signalling that the right hand-side of the join can be pruned entirely (and thus the entire InlineJoin).
1 parent 8697379 commit 2f0010c

File tree

5 files changed

+375
-50
lines changed

5 files changed

+375
-50
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,6 +1376,45 @@ total_duration:long | day:date | days:date
13761376
17016205 |2023-10-23T13:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z]
13771377
;
13781378

1379+
evalBeforeDoubleInlinestats1
1380+
required_capability: inlinestats_v10
1381+
1382+
FROM employees
1383+
| EVAL salaryK = salary/1000
1384+
| INLINESTATS count = COUNT(*) BY salaryK
1385+
| INLINESTATS min = MIN(MV_COUNT(languages)) BY salaryK
1386+
| SORT emp_no
1387+
| KEEP emp_no, still_hired, count
1388+
| LIMIT 5
1389+
;
1390+
1391+
emp_no:integer |still_hired:boolean|count:long
1392+
10001 |true |1
1393+
10002 |true |3
1394+
10003 |false |2
1395+
10004 |true |2
1396+
10005 |true |1
1397+
;
1398+
1399+
evalBeforeDoubleInlinestats2
1400+
required_capability: inlinestats_v10
1401+
1402+
FROM employees
1403+
| EVAL jobs = MV_COUNT(job_positions)
1404+
| INLINESTATS count = COUNT(*) BY jobs
1405+
| INLINESTATS min = MIN(MV_COUNT(languages)) BY jobs
1406+
| SORT emp_no
1407+
| KEEP emp_no, jobs, count, min
1408+
| LIMIT 5
1409+
;
1410+
1411+
emp_no:integer |jobs:integer|count:long|min:integer
1412+
10001 |2 |18 |1
1413+
10002 |1 |27 |1
1414+
10003 |null |11 |1
1415+
10004 |4 |26 |1
1416+
10005 |null |11 |1
1417+
;
13791418

13801419
evalBeforeInlinestatsAndKeepAfter1
13811420
required_capability: inlinestats_v10

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2121
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2222
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
23-
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
2423
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
2524
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
2625
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -37,7 +36,7 @@ public PropagateEmptyRelation() {
3736
@Override
3837
protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) {
3938
LogicalPlan p = plan;
40-
if (plan.child() instanceof LocalRelation local && local.supplier() == EmptyLocalSupplier.EMPTY) {
39+
if (plan.child() instanceof LocalRelation local && local.hasEmptySupplier()) {
4140
// only care about non-grouped aggs might return something (count)
4241
if (plan instanceof Aggregate agg && agg.groupings().isEmpty()) {
4342
List<Block> emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1515
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1616
import org.elasticsearch.xpack.esql.core.expression.Expressions;
17-
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1817
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1918
import org.elasticsearch.xpack.esql.core.util.Holder;
2019
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
@@ -26,7 +25,6 @@
2625
import org.elasticsearch.xpack.esql.plan.logical.Project;
2726
import org.elasticsearch.xpack.esql.plan.logical.Sample;
2827
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
29-
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
3028
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
3129
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
3230
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
@@ -79,7 +77,7 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u
7977
recheck.set(false);
8078
p = switch (p) {
8179
case Aggregate agg -> pruneColumnsInAggregate(agg, used, inlineJoin);
82-
case InlineJoin inj -> pruneColumnsInInlineJoin(inj, used, recheck);
80+
case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, used, recheck);
8381
case Eval eval -> pruneColumnsInEval(eval, used, recheck);
8482
case Project project -> inlineJoin ? pruneColumnsInProject(project, used) : p;
8583
case EsRelation esr -> pruneColumnsInEsRelation(esr, used);
@@ -123,23 +121,9 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
123121
} else {
124122
// not expecting high groups cardinality, nested loops in lists should be fine, no need for a HashSet
125123
if (inlineJoin && aggregate.groupings().containsAll(remaining)) {
126-
// It's an INLINEJOIN and all remaining attributes are groupings, which are already part of the IJ output (from the
127-
// left-hand side).
128-
// TODO: INLINESTATS: revisit condition when adding support for INLINESTATS filters
129-
if (aggregate.child() instanceof StubRelation stub) {
130-
var message = "Aggregate groups references ["
131-
+ remaining
132-
+ "] not in child's (StubRelation) output: ["
133-
+ stub.outputSet()
134-
+ "]";
135-
assert stub.outputSet().containsAll(Expressions.asAttributes(remaining)) : message;
136-
137-
p = emptyLocalRelation(aggregate);
138-
} else {
139-
// There are no aggregates to compute, just output the groupings; these are already in the IJ output, so only
140-
// restrict the output to what remained.
141-
p = new Project(aggregate.source(), aggregate.child(), remaining);
142-
}
124+
// An INLINEJOIN right-hand side aggregation output had everything pruned, except for (some of the) groupings, which are
125+
// already part of the IJ output (from the left-hand side): the agg can just be dropped entirely.
126+
p = emptyLocalRelation(aggregate);
143127
} else { // not an INLINEJOIN or there are actually aggregates to compute
144128
p = aggregate.with(aggregate.groupings(), remaining);
145129
}
@@ -148,12 +132,12 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
148132
return p;
149133
}
150134

151-
private static LogicalPlan pruneColumnsInInlineJoin(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
135+
private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
152136
LogicalPlan p = ij;
153137

154138
used.addAll(ij.references());
155139
var right = pruneColumns(ij.right(), used, true);
156-
if (right.output().isEmpty()) {
140+
if (right.output().isEmpty() || isLocalEmptyRelation(right)) {
157141
p = ij.left();
158142
recheck.set(true);
159143
} else if (right != ij.right()) {
@@ -181,18 +165,13 @@ private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder us
181165
return p;
182166
}
183167

168+
// Note: only run when the Project is a descendent of an InlineJoin.
184169
private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) {
185170
LogicalPlan p = project;
186171

187172
var remaining = pruneUnusedAndAddReferences(project.projections(), used);
188173
if (remaining != null) {
189-
p = remaining.isEmpty() || remaining.stream().allMatch(FieldAttribute.class::isInstance)
190-
? emptyLocalRelation(project)
191-
: new Project(project.source(), project.child(), remaining);
192-
} else if (project.output().stream().allMatch(FieldAttribute.class::isInstance)) {
193-
// Use empty relation as a marker for a subsequent pass, in case the project is only outputting field attributes (which are
194-
// already part of the INLINEJOIN left-hand side output).
195-
p = emptyLocalRelation(project);
174+
p = remaining.isEmpty() ? emptyLocalRelation(project) : new Project(project.source(), project.child(), remaining);
196175
}
197176

198177
return p;
@@ -216,7 +195,11 @@ private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet
216195

217196
private static LogicalPlan emptyLocalRelation(LogicalPlan plan) {
218197
// create an empty local relation with no attributes
219-
return new LocalRelation(plan.source(), List.of(), EmptyLocalSupplier.EMPTY);
198+
return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY);
199+
}
200+
201+
private static boolean isLocalEmptyRelation(LogicalPlan plan) {
202+
return plan instanceof LocalRelation local && local.hasEmptySupplier();
220203
}
221204

222205
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalRelation.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void writeTo(StreamOutput out) throws IOException {
5555
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
5656
out.writeNamedWriteable(supplier);
5757
} else {
58-
if (supplier == EmptyLocalSupplier.EMPTY) {
58+
if (hasEmptySupplier()) {
5959
out.writeVInt(0);
6060
} else {// here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY
6161
((ImmediateLocalSupplier) supplier).writeTo(out);
@@ -77,6 +77,10 @@ public LocalSupplier supplier() {
7777
return supplier;
7878
}
7979

80+
public boolean hasEmptySupplier() {
81+
return supplier == EmptyLocalSupplier.EMPTY;
82+
}
83+
8084
@Override
8185
public boolean expressionsResolved() {
8286
return true;

0 commit comments

Comments
 (0)