Skip to content

Commit d7dc024

Browse files
committed
Refactor to reuse code
1 parent 95037e9 commit d7dc024

File tree

4 files changed

+171
-81
lines changed

4 files changed

+171
-81
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEvalFoldables.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,37 +26,41 @@ public final class PropagateEvalFoldables extends ParameterizedRule<LogicalPlan,
2626

2727
@Override
2828
public LogicalPlan apply(LogicalPlan plan, LogicalOptimizerContext ctx) {
29-
AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
29+
AttributeMap<Expression> collectRefs = foldableReferences(plan, ctx);
30+
if (collectRefs.isEmpty()) {
31+
return plan;
32+
}
33+
34+
plan = plan.transformUp(p -> {
35+
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
36+
// TODO: also allow aggregates once aggs on constants are supported.
37+
// C.f. https://github.com/elastic/elasticsearch/issues/100634
38+
if (p instanceof Filter || p instanceof Eval) {
39+
p = p.transformExpressionsOnly(ReferenceAttribute.class, r -> collectRefs.resolve(r, r));
40+
}
41+
return p;
42+
});
43+
44+
return plan;
45+
}
3046

31-
java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefsBuilder.build().resolve(r, r);
47+
public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, LogicalOptimizerContext ctx) {
48+
AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
3249

3350
// collect aliases bottom-up
3451
plan.forEachExpressionUp(Alias.class, a -> {
3552
var c = a.child();
3653
boolean shouldCollect = c.foldable();
3754
// try to resolve the expression based on an existing foldables
3855
if (shouldCollect == false) {
39-
c = c.transformUp(ReferenceAttribute.class, replaceReference);
56+
c = c.transformUp(ReferenceAttribute.class, r -> collectRefsBuilder.build().resolve(r, r));
4057
shouldCollect = c.foldable();
4158
}
4259
if (shouldCollect) {
4360
collectRefsBuilder.put(a.toAttribute(), Literal.of(ctx.foldCtx(), c));
4461
}
4562
});
46-
if (collectRefsBuilder.isEmpty()) {
47-
return plan;
48-
}
49-
50-
plan = plan.transformUp(p -> {
51-
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
52-
// TODO: also allow aggregates once aggs on constants are supported.
53-
// C.f. https://github.com/elastic/elasticsearch/issues/100634
54-
if (p instanceof Filter || p instanceof Eval) {
55-
p = p.transformExpressionsOnly(ReferenceAttribute.class, replaceReference);
56-
}
57-
return p;
58-
});
5963

60-
return plan;
64+
return collectRefsBuilder.build();
6165
}
6266
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PruneLeftJoinOnNullMatchingField.java

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,54 +7,41 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
99

10-
import org.elasticsearch.xpack.esql.core.expression.Alias;
1110
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
1211
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1312
import org.elasticsearch.xpack.esql.core.expression.Expression;
1413
import org.elasticsearch.xpack.esql.core.expression.Expressions;
15-
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1615
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
16+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEvalFoldables;
1717
import org.elasticsearch.xpack.esql.plan.logical.Eval;
1818
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1919
import org.elasticsearch.xpack.esql.plan.logical.Project;
20-
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2120
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2221

23-
import java.util.ArrayList;
24-
import java.util.List;
25-
2622
import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
2723
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
2824

2925
/**
30-
* The rule matches a plan pattern having a Join on top of a Project and/or Eval. It then checks if the join's performed on a field which
31-
* is aliased to null (in type or value); if that's the case, it prunes the join, replacing it with an Eval - returning aliases to null
32-
* for all the fields added in by the right side of the Join - plus a Project on top of it.
33-
* The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is inserted
34-
* due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceMissingFieldWithNull}.
26+
* The rule checks if the join's performed on a field which is aliased to null (in type or value); if that's the case, it prunes the join,
27+
* replacing it with an Eval - returning aliases to null for all the fields added in by the right side of the Join - plus a Project on top
28+
* of it. The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is
29+
* inserted due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceMissingFieldWithNull}.
3530
*/
36-
public class PruneLeftJoinOnNullMatchingField extends OptimizerRules.OptimizerRule<Join> {
31+
public class PruneLeftJoinOnNullMatchingField extends OptimizerRules.ParameterizedOptimizerRule<Join, LogicalOptimizerContext> {
32+
33+
public PruneLeftJoinOnNullMatchingField() {
34+
super(OptimizerRules.TransformDirection.DOWN);
35+
}
3736

3837
@Override
39-
protected LogicalPlan rule(Join join) {
38+
protected LogicalPlan rule(Join join, LogicalOptimizerContext ctx) {
4039
LogicalPlan plan = join;
4140
if (join.config().type() == LEFT) { // other types will have different replacement logic
42-
AttributeMap.Builder<Expression> attributeMapBuilder = AttributeMap.builder();
43-
loop: for (var child = join.left();; child = ((UnaryPlan) child).child()) { // cast is safe as both plans are UnaryPlans
44-
switch (child) {
45-
case Project project -> project.projections().forEach(projection -> {
46-
if (projection instanceof Alias alias) {
47-
attributeMapBuilder.put(alias.toAttribute(), alias.child());
48-
}
49-
});
50-
case Eval eval -> eval.fields().forEach(alias -> attributeMapBuilder.put(alias.toAttribute(), alias.child()));
51-
default -> {
52-
break loop;
53-
}
54-
}
55-
}
41+
AttributeMap<Expression> attributeMap = PropagateEvalFoldables.foldableReferences(join, ctx);
42+
5643
for (var attr : AttributeSet.of(join.config().matchFields())) {
57-
var resolved = attributeMapBuilder.build().resolve(attr);
44+
var resolved = attributeMap.resolve(attr);
5845
if (resolved != null && isGuaranteedNull(resolved)) {
5946
plan = replaceJoin(join);
6047
break;
@@ -70,10 +57,8 @@ private static LogicalPlan replaceJoin(Join join) {
7057
if (joinRightOutput.isEmpty()) {
7158
return join.left();
7259
}
73-
List<Alias> aliases = new ArrayList<>(joinRightOutput.size());
74-
// TODO: cache aliases by type, à la ReplaceMissingFieldWithNull#missingToNull (tho lookup indices won't have Ks of fields)
75-
joinRightOutput.forEach(a -> aliases.add(new Alias(a.source(), a.name(), Literal.of(a, null), a.id())));
76-
var eval = new Eval(join.source(), join.left(), aliases);
77-
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliases)));
60+
var aliasedNulls = ReplaceMissingFieldWithNull.aliasedNulls(joinRightOutput, a -> true);
61+
var eval = new Eval(join.source(), join.left(), aliasedNulls.v1());
62+
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliasedNulls.v2())));
7863
}
7964
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
99

1010
import org.elasticsearch.common.util.Maps;
11+
import org.elasticsearch.core.Tuple;
1112
import org.elasticsearch.index.IndexMode;
1213
import org.elasticsearch.xpack.esql.core.expression.Alias;
1314
import org.elasticsearch.xpack.esql.core.expression.Attribute;
@@ -29,6 +30,7 @@
2930
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
3031

3132
import java.util.ArrayList;
33+
import java.util.Collection;
3234
import java.util.List;
3335
import java.util.Map;
3436
import java.util.function.Predicate;
@@ -73,42 +75,15 @@ private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> sh
7375
// \_Eval[field1 = null, field3 = null]
7476
// \_EsRelation[field1, field2, field3]
7577
List<Attribute> relationOutput = relation.output();
76-
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
77-
List<NamedExpression> newProjections = new ArrayList<>(relationOutput.size());
78-
for (int i = 0, size = relationOutput.size(); i < size; i++) {
79-
Attribute attr = relationOutput.get(i);
80-
NamedExpression projection;
81-
if (attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false) {
82-
DataType dt = f.dataType();
83-
Alias nullAlias = nullLiterals.get(dt);
84-
// save the first field as null (per datatype)
85-
if (nullAlias == null) {
86-
// Keep the same id so downstream query plans don't need updating
87-
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
88-
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
89-
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
90-
// layouts due to a duplicate name id.
91-
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
92-
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
93-
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
94-
nullLiterals.put(dt, alias);
95-
projection = alias.toAttribute();
96-
}
97-
// otherwise point to it since this avoids creating field copies
98-
else {
99-
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
100-
}
101-
} else {
102-
projection = attr;
103-
}
104-
newProjections.add(projection);
105-
}
78+
var aliasedNulls = aliasedNulls(relationOutput, attr -> attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false);
79+
var nullLiterals = aliasedNulls.v1();
80+
var newProjections = aliasedNulls.v2();
10681

10782
if (nullLiterals.size() == 0) {
10883
return plan;
10984
}
11085

111-
Eval eval = new Eval(plan.source(), relation, new ArrayList<>(nullLiterals.values()));
86+
Eval eval = new Eval(plan.source(), relation, nullLiterals);
11287
// This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
11388
return new Project(plan.source(), eval, newProjections);
11489
}
@@ -123,4 +98,41 @@ private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> sh
12398

12499
return plan;
125100
}
101+
102+
public static Tuple<List<Alias>, List<NamedExpression>> aliasedNulls(
103+
List<Attribute> outputAttributes,
104+
Predicate<Attribute> shouldBeReplaced)
105+
{
106+
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
107+
List<NamedExpression> newProjections = new ArrayList<>(outputAttributes.size());
108+
for (Attribute attr : outputAttributes) {
109+
NamedExpression projection;
110+
if (shouldBeReplaced.test(attr)) {
111+
DataType dt = attr.dataType();
112+
Alias nullAlias = nullLiterals.get(dt);
113+
// save the first field as null (per datatype)
114+
if (nullAlias == null) {
115+
// Keep the same id so downstream query plans don't need updating
116+
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
117+
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
118+
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
119+
// layouts due to a duplicate name id.
120+
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
121+
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
122+
Alias alias = new Alias(attr.source(), attr.name(), Literal.of(attr, null), attr.id());
123+
nullLiterals.put(dt, alias);
124+
projection = alias.toAttribute();
125+
}
126+
// otherwise point to it since this avoids creating field copies
127+
else {
128+
projection = new Alias(attr.source(), attr.name(), nullAlias.toAttribute(), attr.id());
129+
}
130+
} else {
131+
projection = attr;
132+
}
133+
newProjections.add(projection);
134+
}
135+
136+
return new Tuple<>(new ArrayList<>(nullLiterals.values()), newProjections);
137+
}
126138
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@
6969
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
7070
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
7171
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
72+
import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
7273
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
7374
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
7475
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
76+
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
7577
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
7678
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
7779
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
@@ -1421,6 +1423,93 @@ public void testMissingFieldsPurgesTheJoinLocally() {
14211423
var source = as(eval.child(), EsQueryExec.class);
14221424
}
14231425

1426+
/*
1427+
* LimitExec[1000[INTEGER]]
1428+
* \_AggregateExec[[language_code{r}#7],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#7],FINAL,[language_code{r}#7, $
1429+
* $c$count{r}#32, $$c$seen{r}#33],12]
1430+
* \_ExchangeExec[[language_code{r}#7, $$c$count{r}#32, $$c$seen{r}#33],true]
1431+
* \_AggregateExec[[language_code{r}#7],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#7],INITIAL,[language_code{r}#7,
1432+
* $$c$count{r}#34, $$c$seen{r}#35],12]
1433+
* \_GrokExec[first_name{f}#19,Parser[pattern=%{WORD:foo}, grok=org.elasticsearch.grok.Grok@75389ac1],[foo{r}#12]]
1434+
* \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
1435+
* \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
1436+
* \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
1437+
* \_EvalExec[[null[INTEGER] AS languages#21]]
1438+
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[112]
1439+
*/
1440+
public void testMissingFieldsPurgesTheJoinLocallyThroughCommands() {
1441+
var stats = EsqlTestUtils.statsForMissingField("languages");
1442+
1443+
var plan = plannerOptimizer.plan("""
1444+
from test
1445+
| keep emp_no, languages, first_name
1446+
| rename languages AS language_code
1447+
| mv_expand emp_no
1448+
| grok first_name "%{WORD:foo}"
1449+
| lookup join languages_lookup ON language_code
1450+
| stats c = count(emp_no) by language_code
1451+
""", stats);
1452+
1453+
var limit = as(plan, LimitExec.class);
1454+
var agg = as(limit.child(), AggregateExec.class);
1455+
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
1456+
1457+
var exchange = as(agg.child(), ExchangeExec.class);
1458+
agg = as(exchange.child(), AggregateExec.class);
1459+
var grok = as(agg.child(), GrokExec.class);
1460+
var mvexpand = as(grok.child(), MvExpandExec.class);
1461+
var project = as(mvexpand.child(), ProjectExec.class);
1462+
var extract = as(project.child(), FieldExtractExec.class);
1463+
var eval = as(extract.child(), EvalExec.class);
1464+
var source = as(eval.child(), EsQueryExec.class);
1465+
}
1466+
1467+
/*
1468+
* LimitExec[1000[INTEGER]]
1469+
* \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],FINAL,[language_code{r}#12
1470+
* , $$c$count{r}#32, $$c$seen{r}#33],12]
1471+
* \_ExchangeExec[[language_code{r}#12, $$c$count{r}#32, $$c$seen{r}#33],true]
1472+
* \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],INITIAL,[language_code{r}#
1473+
* 12, $$c$count{r}#34, $$c$seen{r}#35],12]
1474+
* \_LookupJoinExec[[language_code{r}#12],[language_code{f}#29],[]]
1475+
* |_GrokExec[first_name{f}#19,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@764e5109],[languag
1476+
* e_code{r}#12]]
1477+
* | \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
1478+
* | \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
1479+
* | \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
1480+
* | \_EvalExec[[null[INTEGER] AS languages#21]]
1481+
* | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[66]
1482+
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#37], limit[], sort[] estimatedRowSize[4]
1483+
*/
1484+
public void testMissingFieldsNotPurgingTheJoinLocally() {
1485+
var stats = EsqlTestUtils.statsForMissingField("languages");
1486+
1487+
var plan = plannerOptimizer.plan("""
1488+
from test
1489+
| keep emp_no, languages, first_name
1490+
| rename languages AS language_code
1491+
| mv_expand emp_no
1492+
| grok first_name "%{NUMBER:language_code:int}" // this reassigns language_code
1493+
| lookup join languages_lookup ON language_code
1494+
| stats c = count(emp_no) by language_code
1495+
""", stats);
1496+
1497+
var limit = as(plan, LimitExec.class);
1498+
var agg = as(limit.child(), AggregateExec.class);
1499+
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
1500+
1501+
var exchange = as(agg.child(), ExchangeExec.class);
1502+
agg = as(exchange.child(), AggregateExec.class);
1503+
var join = as(agg.child(), LookupJoinExec.class);
1504+
var grok = as(join.left(), GrokExec.class);
1505+
var mvexpand = as(grok.child(), MvExpandExec.class);
1506+
var project = as(mvexpand.child(), ProjectExec.class);
1507+
var extract = as(project.child(), FieldExtractExec.class);
1508+
var eval = as(extract.child(), EvalExec.class);
1509+
var source = as(eval.child(), EsQueryExec.class);
1510+
var right = as(join.right(), EsQueryExec.class);
1511+
}
1512+
14241513
/*
14251514
* LimitExec[1000[INTEGER]]
14261515
* \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]

0 commit comments

Comments
 (0)