Skip to content

Commit b2e79a8

Browse files
authored
Avoid dropping aggregate groupings in local plans (#129370) (#130055)
The local plan optimizer should not change the layout, as it has already been agreed upon. However, CombineProjections can violate this when some grouping elements refer to the same attribute. This occurs when ReplaceFieldWithConstantOrNull replaces missing fields with the same reference for a given data type. Closes #128054 Closes #129811 (cherry picked from commit 2bc6284)
1 parent 5ebf9da commit b2e79a8

File tree

6 files changed

+170
-62
lines changed

6 files changed

+170
-62
lines changed

docs/changelog/129370.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 129370
2+
summary: Avoid dropping aggregate groupings in local plans
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129811
7+
- 128054

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1672,6 +1672,39 @@ public void testQueryOnEmptyDataIndex() {
16721672
}
16731673
}
16741674

1675+
public void testGroupingStatsOnMissingFields() {
1676+
assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long"));
1677+
long oneValue = between(1, 1000);
1678+
indexDoc("missing_field_index", "1", "data", oneValue);
1679+
refresh("missing_field_index");
1680+
QueryPragmas pragmas = randomPragmas();
1681+
pragmas = new QueryPragmas(
1682+
Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
1683+
);
1684+
EsqlQueryRequest request = new EsqlQueryRequest();
1685+
request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color");
1686+
request.pragmas(pragmas);
1687+
try (var r = run(request)) {
1688+
var rows = getValuesList(r);
1689+
assertThat(rows, hasSize(4));
1690+
for (List<Object> row : rows) {
1691+
assertThat(row, hasSize(3));
1692+
}
1693+
assertThat(rows.get(0).get(0), equalTo(20L));
1694+
assertThat(rows.get(0).get(1), equalTo("blue"));
1695+
assertNull(rows.get(0).get(2));
1696+
assertThat(rows.get(1).get(0), equalTo(10L));
1697+
assertThat(rows.get(1).get(1), equalTo("green"));
1698+
assertNull(rows.get(1).get(2));
1699+
assertThat(rows.get(2).get(0), equalTo(30L));
1700+
assertThat(rows.get(2).get(1), equalTo("red"));
1701+
assertNull(rows.get(2).get(2));
1702+
assertThat(rows.get(3).get(0), equalTo(oneValue));
1703+
assertNull(rows.get(3).get(1));
1704+
assertNull(rows.get(3).get(2));
1705+
}
1706+
}
1707+
16751708
private void assertEmptyIndexQueries(String from) {
16761709
try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) {
16771710
assertFalse(resp.values().hasNext());
@@ -1809,6 +1842,8 @@ private void createAndPopulateIndex(String indexName, Settings additionalSetting
18091842
"time",
18101843
"type=long",
18111844
"color",
1845+
"type=keyword",
1846+
"tag",
18121847
"type=keyword"
18131848
)
18141849
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
* This class is part of the planner. Data node level logical optimizations. At this point we have access to
3131
* {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index.
3232
*
33-
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()}
33+
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)}
34+
* and {@link LogicalPlanOptimizer#cleanup()}
3435
*/
3536
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {
3637

@@ -58,8 +59,8 @@ protected List<Batch<LogicalPlan>> batches() {
5859

5960
@SuppressWarnings("unchecked")
6061
private static Batch<LogicalPlan> localOperators() {
61-
var operators = operators();
62-
var rules = operators().rules();
62+
var operators = operators(true);
63+
var rules = operators.rules();
6364
List<Rule<?, LogicalPlan>> newRules = new ArrayList<>(rules.length);
6465

6566
// apply updates to existing rules that have different applicability locally

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,22 @@
8080
* <li>The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example,
8181
* a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase
8282
* also applies surrogates, such as replacing an average with a sum divided by a count.</li>
83-
* <li>{@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different
83+
* <li>{@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different
8484
* things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse
8585
* time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever
8686
* possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off
8787
* after many iterations, although hitting that is considered a bug)</li>
8888
* <li>{@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN</li>
8989
* </ul>
9090
*
91-
* <p>Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
91+
* <p>Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
9292
* {@link LocalLogicalPlanOptimizer} layer.</p>
9393
*/
9494
public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LogicalOptimizerContext> {
9595

9696
private static final List<RuleExecutor.Batch<LogicalPlan>> RULES = List.of(
9797
substitutions(),
98-
operators(),
98+
operators(false),
9999
new Batch<>("Skip Compute", new SkipQueryOnLimitZero()),
100100
cleanup(),
101101
new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized())
@@ -155,10 +155,10 @@ protected static Batch<LogicalPlan> substitutions() {
155155
);
156156
}
157157

158-
protected static Batch<LogicalPlan> operators() {
158+
protected static Batch<LogicalPlan> operators(boolean local) {
159159
return new Batch<>(
160160
"Operator Optimization",
161-
new CombineProjections(),
161+
new CombineProjections(local),
162162
new CombineEvals(),
163163
new PruneEmptyPlans(),
164164
new PropagateEmptyRelation(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java

Lines changed: 89 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,24 @@
1818
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1919
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
2020
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
21+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2122
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2223
import org.elasticsearch.xpack.esql.plan.logical.Project;
2324
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2425

2526
import java.util.ArrayList;
27+
import java.util.HashSet;
2628
import java.util.LinkedHashSet;
2729
import java.util.List;
30+
import java.util.Set;
2831

2932
public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
33+
// don't drop groupings from a local plan, as the layout has already been agreed upon
34+
private final boolean local;
3035

31-
public CombineProjections() {
36+
public CombineProjections(boolean local) {
3237
super(OptimizerRules.TransformDirection.UP);
38+
this.local = local;
3339
}
3440

3541
@Override
@@ -60,29 +66,91 @@ protected LogicalPlan rule(UnaryPlan plan) {
6066
return plan;
6167
}
6268

63-
// Agg with underlying Project (group by on sub-queries)
64-
if (plan instanceof Aggregate a) {
65-
if (child instanceof Project p) {
66-
var groupings = a.groupings();
67-
List<NamedExpression> groupingAttrs = new ArrayList<>(a.groupings().size());
68-
for (Expression grouping : groupings) {
69-
if (grouping instanceof Attribute attribute) {
70-
groupingAttrs.add(attribute);
71-
} else if (grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) {
72-
groupingAttrs.add(as);
69+
if (plan instanceof Aggregate a && child instanceof Project p) {
70+
var groupings = a.groupings();
71+
72+
// sanity checks
73+
for (Expression grouping : groupings) {
74+
if ((grouping instanceof Attribute
75+
|| grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) == false) {
76+
// After applying ReplaceAggregateNestedExpressionWithEval,
77+
// evaluatable groupings can only contain attributes.
78+
throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping);
79+
}
80+
}
81+
assert groupings.size() <= 1
82+
|| groupings.stream()
83+
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
84+
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
85+
86+
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
87+
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
88+
for (NamedExpression ne : p.projections()) {
89+
// Record the aliases.
90+
// Projections are just aliases for attributes, so casting is safe.
91+
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
92+
}
93+
var aliases = aliasesBuilder.build();
94+
95+
// Propagate any renames from the lower projection into the upper groupings.
96+
List<Expression> resolvedGroupings = new ArrayList<>();
97+
for (Expression grouping : groupings) {
98+
Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as));
99+
resolvedGroupings.add(transformed);
100+
}
101+
102+
// This can lead to duplicates in the groupings: e.g.
103+
// | EVAL x = y | STATS ... BY x, y
104+
if (local) {
105+
// On the data node, the groupings must be preserved because they affect the physical output (see
106+
// AbstractPhysicalOperationProviders#intermediateAttributes).
107+
// In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place
108+
// of the original projection to create new attributes for the duplicate groups.
109+
Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size());
110+
List<Expression> newGroupings = new ArrayList<>();
111+
List<Alias> aliasesAgainstDuplication = new ArrayList<>();
112+
113+
for (int i = 0; i < groupings.size(); i++) {
114+
Expression resolvedGrouping = resolvedGroupings.get(i);
115+
if (seenResolvedGroupings.add(resolvedGrouping)) {
116+
newGroupings.add(resolvedGrouping);
73117
} else {
74-
// After applying ReplaceAggregateNestedExpressionWithEval,
75-
// evaluatable groupings can only contain attributes.
76-
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
118+
// resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to.
119+
// should really only be 1 attribute, anyway, but going via .references() includes the case of a
120+
// GroupingFunction.NonEvaluatableGroupingFunction.
121+
Attribute coreAttribute = resolvedGrouping.references().iterator().next();
122+
123+
Alias renameAgainstDuplication = new Alias(
124+
coreAttribute.source(),
125+
TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name()),
126+
coreAttribute
127+
);
128+
aliasesAgainstDuplication.add(renameAgainstDuplication);
129+
130+
// propagate the new alias into the new grouping
131+
AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder();
132+
resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute());
133+
AttributeMap<Attribute> resolver = resolverBuilder.build();
134+
135+
newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr)));
77136
}
78137
}
79-
plan = new Aggregate(
80-
a.source(),
81-
p.child(),
82-
a.aggregateType(),
83-
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
84-
combineProjections(a.aggregates(), p.projections())
85-
);
138+
139+
LogicalPlan newChild = aliasesAgainstDuplication.isEmpty()
140+
? p.child()
141+
: new Eval(p.source(), p.child(), aliasesAgainstDuplication);
142+
plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections()));
143+
} else {
144+
// On the coordinator, we can just discard the duplicates.
145+
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which
146+
// will be an alias like `c = CATEGORIZE(attribute)`.
147+
// Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on
148+
// regular equality (i.e. based on names) instead of name ids.
149+
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g.
150+
// for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also
151+
// applies in the local case below.
152+
List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings));
153+
plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections()));
86154
}
87155
}
88156

@@ -145,39 +213,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr
145213
return replaced;
146214
}
147215

148-
private static List<Expression> combineUpperGroupingsAndLowerProjections(
149-
List<? extends NamedExpression> upperGroupings,
150-
List<? extends NamedExpression> lowerProjections
151-
) {
152-
assert upperGroupings.size() <= 1
153-
|| upperGroupings.stream()
154-
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
155-
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
156-
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
157-
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
158-
for (NamedExpression ne : lowerProjections) {
159-
// Record the aliases.
160-
// Projections are just aliases for attributes, so casting is safe.
161-
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
162-
}
163-
var aliases = aliasesBuilder.build();
164-
165-
// Propagate any renames from the lower projection into the upper groupings.
166-
// This can lead to duplicates: e.g.
167-
// | EVAL x = y | STATS ... BY x, y
168-
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which will be
169-
// an alias like `c = CATEGORIZE(attribute)`.
170-
// Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet).
171-
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. for
172-
// `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead.
173-
LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>();
174-
for (NamedExpression ne : upperGroupings) {
175-
NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a));
176-
resolvedGroupings.add(transformed);
177-
}
178-
return new ArrayList<>(resolvedGroupings);
179-
}
180-
181216
/**
182217
* Replace grouping alias previously contained in the aggregations that might have been projected away.
183218
*/

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,36 @@ public void testUnionTypesInferNonNullAggConstraint() {
744744
assertEquals("integer_long_field", unionTypeField.fieldName().string());
745745
}
746746

747+
/**
748+
* \_Aggregate[[first_name{r}#7, $$first_name$temp_name$17{r}#18],[SUM(salary{f}#11,true[BOOLEAN]) AS SUM(salary)#5, first_nam
749+
* e{r}#7, first_name{r}#7 AS last_name#10]]
750+
* \_Eval[[null[KEYWORD] AS first_name#7, null[KEYWORD] AS $$first_name$temp_name$17#18]]
751+
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
752+
*/
753+
public void testGroupingByMissingFields() {
754+
var plan = plan("FROM test | STATS SUM(salary) BY first_name, last_name");
755+
var testStats = statsForMissingField("first_name", "last_name");
756+
var localPlan = localPlan(plan, testStats);
757+
Limit limit = as(localPlan, Limit.class);
758+
Aggregate aggregate = as(limit.child(), Aggregate.class);
759+
assertThat(aggregate.groupings(), hasSize(2));
760+
ReferenceAttribute grouping1 = as(aggregate.groupings().get(0), ReferenceAttribute.class);
761+
ReferenceAttribute grouping2 = as(aggregate.groupings().get(1), ReferenceAttribute.class);
762+
Eval eval = as(aggregate.child(), Eval.class);
763+
assertThat(eval.fields(), hasSize(2));
764+
Alias eval1 = eval.fields().get(0);
765+
Literal literal1 = as(eval1.child(), Literal.class);
766+
assertNull(literal1.value());
767+
assertThat(literal1.dataType(), is(DataType.KEYWORD));
768+
Alias eval2 = eval.fields().get(1);
769+
Literal literal2 = as(eval2.child(), Literal.class);
770+
assertNull(literal2.value());
771+
assertThat(literal2.dataType(), is(DataType.KEYWORD));
772+
assertThat(grouping1.id(), equalTo(eval1.id()));
773+
assertThat(grouping2.id(), equalTo(eval2.id()));
774+
as(eval.child(), EsRelation.class);
775+
}
776+
747777
private IsNotNull isNotNull(Expression field) {
748778
return new IsNotNull(EMPTY, field);
749779
}

0 commit comments

Comments
 (0)