Skip to content

Commit dcbe09f

Browse files
committed
Run single phase aggregation when possible
1 parent d2da68e commit dcbe09f

File tree

5 files changed

+85
-20
lines changed

5 files changed

+85
-20
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushTopNToSource;
2020
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceRoundToWithQueryAndTags;
2121
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes;
22+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SinglePhaseAggregate;
2223
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialDocValuesExtraction;
2324
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialShapeBoundsExtraction;
2425
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -78,7 +79,12 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
7879
// execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate
7980
// multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are
8081
// done, and it should be executed only once.
81-
var substitutionRules = new Batch<>("Substitute RoundTo with QueryAndTags", Limiter.ONCE, new ReplaceRoundToWithQueryAndTags());
82+
var substitutionRules = new Batch<>(
83+
"Substitute RoundTo with QueryAndTags",
84+
Limiter.ONCE,
85+
new ReplaceRoundToWithQueryAndTags(),
86+
new SinglePhaseAggregate()
87+
);
8288

8389
// add the field extraction in just one pass
8490
// add it at the end after all the other rules have ran
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
9+
10+
import org.elasticsearch.compute.aggregation.AggregatorMode;
11+
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
12+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
13+
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
14+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
15+
16+
/**
17+
* Collapses two-phase aggregation into a single phase when possible.
18+
* For example, in FROM .. | STATS first | STATS second, the STATS second aggregation
19+
* can be executed in a single phase on the coordinator instead of two phases.
20+
*/
21+
public class SinglePhaseAggregate extends PhysicalOptimizerRules.OptimizerRule<AggregateExec> {
22+
@Override
23+
protected PhysicalPlan rule(AggregateExec plan) {
24+
if (plan instanceof AggregateExec parent
25+
&& parent.getMode() == AggregatorMode.FINAL
26+
&& parent.child() instanceof AggregateExec child
27+
&& child.getMode() == AggregatorMode.INITIAL) {
28+
if (parent.groupings()
29+
.stream()
30+
.noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) {
31+
return child.withMode(AggregatorMode.SINGLE);
32+
}
33+
}
34+
return plan;
35+
}
36+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ public final PhysicalOperation groupingPhysicalOperation(
7575
List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();
7676

7777
// append channels to the layout
78-
if (aggregatorMode == AggregatorMode.FINAL) {
79-
layout.append(aggregates);
80-
} else {
78+
if (aggregatorMode.isOutputPartial()) {
8179
layout.append(aggregateMapper.mapNonGrouping(aggregates));
80+
} else {
81+
layout.append(aggregates);
8282
}
8383

8484
// create the agg factories
@@ -147,14 +147,14 @@ else if (aggregatorMode.isOutputPartial()) {
147147
groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), sourceGroupAttribute, group));
148148
}
149149

150-
if (aggregatorMode == AggregatorMode.FINAL) {
150+
if (aggregatorMode.isOutputPartial()) {
151+
layout.append(aggregateMapper.mapGrouping(aggregates));
152+
} else {
151153
for (var agg : aggregates) {
152154
if (Alias.unwrap(agg) instanceof AggregateFunction) {
153155
layout.append(agg);
154156
}
155157
}
156-
} else {
157-
layout.append(aggregateMapper.mapGrouping(aggregates));
158158
}
159159

160160
// create the agg factories
@@ -266,7 +266,13 @@ private void aggregatesToFactory(
266266
if (child instanceof AggregateFunction aggregateFunction) {
267267
List<NamedExpression> sourceAttr = new ArrayList<>();
268268

269-
if (mode == AggregatorMode.INITIAL) {
269+
if (mode.isInputPartial()) {
270+
if (grouping) {
271+
sourceAttr = aggregateMapper.mapGrouping(ne);
272+
} else {
273+
sourceAttr = aggregateMapper.mapNonGrouping(ne);
274+
}
275+
} else {
270276
// TODO: this needs to be made more reliable - use casting to blow up when dealing with expressions (e+1)
271277
Expression field = aggregateFunction.field();
272278
// Only count can now support literals - all the other aggs should be optimized away
@@ -294,16 +300,6 @@ private void aggregatesToFactory(
294300
}
295301
}
296302
}
297-
// coordinator/exchange phase
298-
else if (mode == AggregatorMode.FINAL || mode == AggregatorMode.INTERMEDIATE) {
299-
if (grouping) {
300-
sourceAttr = aggregateMapper.mapGrouping(ne);
301-
} else {
302-
sourceAttr = aggregateMapper.mapNonGrouping(ne);
303-
}
304-
} else {
305-
throw new EsqlIllegalArgumentException("illegal aggregation mode");
306-
}
307303

308304
AggregatorFunctionSupplier aggSupplier = supplier(aggregateFunction);
309305

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.common.util.BigArrays;
1515
import org.elasticsearch.common.util.Maps;
1616
import org.elasticsearch.compute.Describable;
17-
import org.elasticsearch.compute.aggregation.AggregatorMode;
1817
import org.elasticsearch.compute.data.Block;
1918
import org.elasticsearch.compute.data.BlockFactory;
2019
import org.elasticsearch.compute.data.ElementType;
@@ -226,7 +225,7 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
226225
// workaround for https://github.com/elastic/elasticsearch/issues/99782
227226
localPhysicalPlan = localPhysicalPlan.transformUp(
228227
AggregateExec.class,
229-
a -> a.getMode() == AggregatorMode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
228+
a -> a.getMode().isOutputPartial() ? a : new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates()))
230229
);
231230
PhysicalOperation physicalOperation = plan(localPhysicalPlan, context);
232231

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@
164164
import static java.util.Arrays.asList;
165165
import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
166166
import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL;
167+
import static org.elasticsearch.compute.aggregation.AggregatorMode.SINGLE;
167168
import static org.elasticsearch.core.Tuple.tuple;
168169
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
169170
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
@@ -7815,6 +7816,33 @@ public void testLookupJoinFieldLoadingDropAllFields() throws Exception {
78157816
assertLookupJoinFieldNames(query, data, List.of(Set.of(), Set.of("foo", "bar", "baz")));
78167817
}
78177818

7819+
/**
7820+
* LimitExec[1000[INTEGER],null]
7821+
* \_AggregateExec[[last_name{r}#8],[COUNT(first_name{r}#5,true[BOOLEAN]) AS count(first_name)#11, last_name{r}#8],SINGLE,[last_name
7822+
* {r}#8, $$count(first_name)$count{r}#25, $$count(first_name)$seen{r}#26],null]
7823+
* \_AggregateExec[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
7824+
* S last_name#8],FINAL,[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],null]
7825+
* \_ExchangeExec[[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],true]
7826+
* \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[
7827+
* Aggregate[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
7828+
* S last_name#8]]
7829+
* \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]]]
7830+
*/
7831+
public void testSingleModeAggregate() {
7832+
String q = """
7833+
FROM test
7834+
| STATS first_name = VALUES(first_name), last_name = VALUES(last_name) BY emp_no
7835+
| STATS count(first_name) BY last_name""";
7836+
PhysicalPlan plan = physicalPlan(q);
7837+
PhysicalPlan optimized = physicalPlanOptimizer.optimize(plan);
7838+
LimitExec limit = as(optimized, LimitExec.class);
7839+
AggregateExec second = as(limit.child(), AggregateExec.class);
7840+
assertThat(second.getMode(), equalTo(SINGLE));
7841+
AggregateExec first = as(second.child(), AggregateExec.class);
7842+
assertThat(first.getMode(), equalTo(FINAL));
7843+
as(first.child(), ExchangeExec.class);
7844+
}
7845+
78187846
private void assertLookupJoinFieldNames(String query, TestDataSource data, List<Set<String>> expectedFieldNames) {
78197847
assertLookupJoinFieldNames(query, data, expectedFieldNames, false);
78207848
}

0 commit comments

Comments
 (0)