Skip to content

Commit 457aec7

Browse files
authored
Avoid dropping aggregate groupings in local plans (#129370) (#130060)
The local plan optimizer should not change the layout, as it has already been agreed upon. However, CombineProjections can violate this when some grouping elements refer to the same attribute. This occurs when ReplaceFieldWithConstantOrNull replaces missing fields with the same reference for a given data type. Closes #128054 Closes #129811 (cherry picked from commit 2bc6284)
1 parent 87d3b17 commit 457aec7

File tree

6 files changed

+169
-59
lines changed

6 files changed

+169
-59
lines changed

docs/changelog/129370.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 129370
2+
summary: Avoid dropping aggregate groupings in local plans
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129811
7+
- 128054

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,6 +1476,39 @@ public void testQueryOnEmptyDataIndex() {
14761476
}
14771477
}
14781478

1479+
public void testGroupingStatsOnMissingFields() {
1480+
assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long"));
1481+
long oneValue = between(1, 1000);
1482+
indexDoc("missing_field_index", "1", "data", oneValue);
1483+
refresh("missing_field_index");
1484+
QueryPragmas pragmas = randomPragmas();
1485+
pragmas = new QueryPragmas(
1486+
Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
1487+
);
1488+
EsqlQueryRequest request = new EsqlQueryRequest();
1489+
request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color");
1490+
request.pragmas(pragmas);
1491+
try (var r = run(request)) {
1492+
var rows = getValuesList(r);
1493+
assertThat(rows, hasSize(4));
1494+
for (List<Object> row : rows) {
1495+
assertThat(row, hasSize(3));
1496+
}
1497+
assertThat(rows.get(0).get(0), equalTo(20L));
1498+
assertThat(rows.get(0).get(1), equalTo("blue"));
1499+
assertNull(rows.get(0).get(2));
1500+
assertThat(rows.get(1).get(0), equalTo(10L));
1501+
assertThat(rows.get(1).get(1), equalTo("green"));
1502+
assertNull(rows.get(1).get(2));
1503+
assertThat(rows.get(2).get(0), equalTo(30L));
1504+
assertThat(rows.get(2).get(1), equalTo("red"));
1505+
assertNull(rows.get(2).get(2));
1506+
assertThat(rows.get(3).get(0), equalTo(oneValue));
1507+
assertNull(rows.get(3).get(1));
1508+
assertNull(rows.get(3).get(2));
1509+
}
1510+
}
1511+
14791512
private void assertEmptyIndexQueries(String from) {
14801513
try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) {
14811514
assertFalse(resp.values().hasNext());
@@ -1610,6 +1643,8 @@ private void createAndPopulateIndex(String indexName, Settings additionalSetting
16101643
"time",
16111644
"type=long",
16121645
"color",
1646+
"type=keyword",
1647+
"tag",
16131648
"type=keyword"
16141649
)
16151650
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
* This class is part of the planner. Data node level logical optimizations. At this point we have access to
3030
* {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index.
3131
*
32-
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()}
32+
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)}
33+
* and {@link LogicalPlanOptimizer#cleanup()}
3334
*/
3435
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {
3536

@@ -51,7 +52,7 @@ protected List<Batch<LogicalPlan>> batches() {
5152
var rules = new ArrayList<Batch<LogicalPlan>>();
5253
rules.add(local);
5354
// TODO: if the local rules haven't touched the tree, the rest of the rules can be skipped
54-
rules.addAll(asList(operators(), cleanup()));
55+
rules.addAll(asList(operators(true), cleanup()));
5556
return replaceRules(rules);
5657
}
5758

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@
7878
* <li>The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example,
7979
* a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase
8080
* also applies surrogates, such as replacing an average with a sum divided by a count.</li>
81-
* <li>{@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different
81+
* <li>{@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different
8282
* things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse
8383
* time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever
8484
* possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off
8585
* after many iterations, although hitting that is considered a bug)</li>
8686
* <li>{@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN</li>
8787
* </ul>
8888
*
89-
* <p>Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
89+
* <p>Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
9090
* {@link LocalLogicalPlanOptimizer} layer.</p>
9191
*/
9292
public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LogicalOptimizerContext> {
@@ -117,7 +117,7 @@ protected static List<Batch<LogicalPlan>> rules() {
117117
var skip = new Batch<>("Skip Compute", new SkipQueryOnLimitZero());
118118
var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized());
119119

120-
return asList(substitutions(), operators(), skip, cleanup(), label);
120+
return asList(substitutions(), operators(false), skip, cleanup(), label);
121121
}
122122

123123
protected static Batch<LogicalPlan> substitutions() {
@@ -150,10 +150,10 @@ protected static Batch<LogicalPlan> substitutions() {
150150
);
151151
}
152152

153-
protected static Batch<LogicalPlan> operators() {
153+
protected static Batch<LogicalPlan> operators(boolean local) {
154154
return new Batch<>(
155155
"Operator Optimization",
156-
new CombineProjections(),
156+
new CombineProjections(local),
157157
new CombineEvals(),
158158
new PruneEmptyPlans(),
159159
new PropagateEmptyRelation(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java

Lines changed: 87 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,24 @@
1717
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1818
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
1919
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
20+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2021
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2122
import org.elasticsearch.xpack.esql.plan.logical.Project;
2223
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2324

2425
import java.util.ArrayList;
26+
import java.util.HashSet;
2527
import java.util.LinkedHashSet;
2628
import java.util.List;
29+
import java.util.Set;
2730

2831
public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
32+
// don't drop groupings from a local plan, as the layout has already been agreed upon
33+
private final boolean local;
2934

30-
public CombineProjections() {
35+
public CombineProjections(boolean local) {
3136
super(OptimizerRules.TransformDirection.UP);
37+
this.local = local;
3238
}
3339

3440
@Override
@@ -59,29 +65,89 @@ protected LogicalPlan rule(UnaryPlan plan) {
5965
return plan;
6066
}
6167

62-
// Agg with underlying Project (group by on sub-queries)
63-
if (plan instanceof Aggregate a) {
64-
if (child instanceof Project p) {
65-
var groupings = a.groupings();
66-
List<NamedExpression> groupingAttrs = new ArrayList<>(a.groupings().size());
67-
for (Expression grouping : groupings) {
68-
if (grouping instanceof Attribute attribute) {
69-
groupingAttrs.add(attribute);
70-
} else if (grouping instanceof Alias as && as.child() instanceof Categorize) {
71-
groupingAttrs.add(as);
68+
if (plan instanceof Aggregate a && child instanceof Project p) {
69+
var groupings = a.groupings();
70+
71+
// sanity checks
72+
for (Expression grouping : groupings) {
73+
if ((grouping instanceof Attribute || grouping instanceof Alias as && as.child() instanceof Categorize) == false) {
74+
// After applying ReplaceAggregateNestedExpressionWithEval,
75+
// evaluatable groupings can only contain attributes.
76+
throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping);
77+
}
78+
}
79+
assert groupings.size() <= 1
80+
|| groupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false
81+
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
82+
83+
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
84+
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
85+
for (NamedExpression ne : p.projections()) {
86+
// Record the aliases.
87+
// Projections are just aliases for attributes, so casting is safe.
88+
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
89+
}
90+
var aliases = aliasesBuilder.build();
91+
92+
// Propagate any renames from the lower projection into the upper groupings.
93+
List<Expression> resolvedGroupings = new ArrayList<>();
94+
for (Expression grouping : groupings) {
95+
Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as));
96+
resolvedGroupings.add(transformed);
97+
}
98+
99+
// This can lead to duplicates in the groupings: e.g.
100+
// | EVAL x = y | STATS ... BY x, y
101+
if (local) {
102+
// On the data node, the groupings must be preserved because they affect the physical output (see
103+
// AbstractPhysicalOperationProviders#intermediateAttributes).
104+
// In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place
105+
// of the original projection to create new attributes for the duplicate groups.
106+
Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size());
107+
List<Expression> newGroupings = new ArrayList<>();
108+
List<Alias> aliasesAgainstDuplication = new ArrayList<>();
109+
110+
for (int i = 0; i < groupings.size(); i++) {
111+
Expression resolvedGrouping = resolvedGroupings.get(i);
112+
if (seenResolvedGroupings.add(resolvedGrouping)) {
113+
newGroupings.add(resolvedGrouping);
72114
} else {
73-
// After applying ReplaceAggregateNestedExpressionWithEval,
74-
// groupings (except Categorize) can only contain attributes.
75-
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
115+
// resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to.
116+
// should really only be 1 attribute, anyway, but going via .references() includes the case of a
117+
// GroupingFunction.NonEvaluatableGroupingFunction.
118+
Attribute coreAttribute = resolvedGrouping.references().iterator().next();
119+
120+
Alias renameAgainstDuplication = new Alias(
121+
coreAttribute.source(),
122+
TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name(), "temp_name"),
123+
coreAttribute
124+
);
125+
aliasesAgainstDuplication.add(renameAgainstDuplication);
126+
127+
// propagate the new alias into the new grouping
128+
AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder();
129+
resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute());
130+
AttributeMap<Attribute> resolver = resolverBuilder.build();
131+
132+
newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr)));
76133
}
77134
}
78-
plan = new Aggregate(
79-
a.source(),
80-
p.child(),
81-
a.aggregateType(),
82-
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
83-
combineProjections(a.aggregates(), p.projections())
84-
);
135+
136+
LogicalPlan newChild = aliasesAgainstDuplication.isEmpty()
137+
? p.child()
138+
: new Eval(p.source(), p.child(), aliasesAgainstDuplication);
139+
plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections()));
140+
} else {
141+
// On the coordinator, we can just discard the duplicates.
142+
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which
143+
// will be an alias like `c = CATEGORIZE(attribute)`.
144+
// Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on
145+
// regular equality (i.e. based on names) instead of name ids.
146+
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g.
147+
// for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also
148+
// applies in the local case below.
149+
List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings));
150+
plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections()));
85151
}
86152
}
87153

@@ -141,37 +207,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr
141207
return replaced;
142208
}
143209

144-
private static List<Expression> combineUpperGroupingsAndLowerProjections(
145-
List<? extends NamedExpression> upperGroupings,
146-
List<? extends NamedExpression> lowerProjections
147-
) {
148-
assert upperGroupings.size() <= 1
149-
|| upperGroupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false
150-
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
151-
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
152-
AttributeMap<Attribute> aliases = new AttributeMap<>();
153-
for (NamedExpression ne : lowerProjections) {
154-
// Record the aliases.
155-
// Projections are just aliases for attributes, so casting is safe.
156-
aliases.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
157-
}
158-
159-
// Propagate any renames from the lower projection into the upper groupings.
160-
// This can lead to duplicates: e.g.
161-
// | EVAL x = y | STATS ... BY x, y
162-
// All substitutions happen before; groupings must be attributes at this point except for CATEGORIZE which will be an alias like
163-
// `c = CATEGORIZE(attribute)`.
164-
// Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet).
165-
// TODO: The deduplication based on simple equality will be insufficient in case of multiple CATEGORIZEs, e.g. for
166-
// `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead.
167-
LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>();
168-
for (NamedExpression ne : upperGroupings) {
169-
NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a));
170-
resolvedGroupings.add(transformed);
171-
}
172-
return new ArrayList<>(resolvedGroupings);
173-
}
174-
175210
/**
176211
* Replace grouping alias previously contained in the aggregations that might have been projected away.
177212
*/

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
2323
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
2424
import org.elasticsearch.xpack.esql.core.expression.Literal;
25+
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
2526
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2627
import org.elasticsearch.xpack.esql.core.tree.Source;
2728
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -78,6 +79,7 @@
7879
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
7980
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
8081
import static org.hamcrest.Matchers.contains;
82+
import static org.hamcrest.Matchers.equalTo;
8183
import static org.hamcrest.Matchers.hasSize;
8284
import static org.hamcrest.Matchers.is;
8385
import static org.hamcrest.Matchers.not;
@@ -610,6 +612,36 @@ public void testUnionTypesInferNonNullAggConstraint() {
610612
assertEquals("integer_long_field", unionTypeField.fieldName().string());
611613
}
612614

615+
/**
616+
* \_Aggregate[[first_name{r}#7, $$first_name$temp_name$17{r}#18],[SUM(salary{f}#11,true[BOOLEAN]) AS SUM(salary)#5, first_nam
617+
* e{r}#7, first_name{r}#7 AS last_name#10]]
618+
* \_Eval[[null[KEYWORD] AS first_name#7, null[KEYWORD] AS $$first_name$temp_name$17#18]]
619+
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
620+
*/
621+
public void testGroupingByMissingFields() {
622+
var plan = plan("FROM test | STATS SUM(salary) BY first_name, last_name");
623+
var testStats = statsForMissingField("first_name", "last_name");
624+
var localPlan = localPlan(plan, testStats);
625+
Limit limit = as(localPlan, Limit.class);
626+
Aggregate aggregate = as(limit.child(), Aggregate.class);
627+
assertThat(aggregate.groupings(), hasSize(2));
628+
ReferenceAttribute grouping1 = as(aggregate.groupings().get(0), ReferenceAttribute.class);
629+
ReferenceAttribute grouping2 = as(aggregate.groupings().get(1), ReferenceAttribute.class);
630+
Eval eval = as(aggregate.child(), Eval.class);
631+
assertThat(eval.fields(), hasSize(2));
632+
Alias eval1 = eval.fields().get(0);
633+
Literal literal1 = as(eval1.child(), Literal.class);
634+
assertNull(literal1.value());
635+
assertThat(literal1.dataType(), is(DataType.KEYWORD));
636+
Alias eval2 = eval.fields().get(1);
637+
Literal literal2 = as(eval2.child(), Literal.class);
638+
assertNull(literal2.value());
639+
assertThat(literal2.dataType(), is(DataType.KEYWORD));
640+
assertThat(grouping1.id(), equalTo(eval1.id()));
641+
assertThat(grouping2.id(), equalTo(eval2.id()));
642+
as(eval.child(), EsRelation.class);
643+
}
644+
613645
private IsNotNull isNotNull(Expression field) {
614646
return new IsNotNull(EMPTY, field);
615647
}

0 commit comments

Comments
 (0)