Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131485.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131485
summary: Run single phase aggregation when possible
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TS k8s

rate_bytes_in:double | time_bucket:datetime
null | 2024-05-10T00:01:00.000Z
28.80297619047619 | 2024-05-10T00:15:00.000Z
28.802976190476194 | 2024-05-10T00:15:00.000Z
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side effect of running a single-phase aggregation for avg

25.422222222222224 | 2024-05-10T00:04:00.000Z
24.28946078431372 | 2024-05-10T00:19:00.000Z
24.05555555555555 | 2024-05-10T00:10:00.000Z
Expand Down Expand Up @@ -136,7 +136,7 @@ TS k8s

rate_bytes_in:double | time_bucket:datetime
null | 2024-05-10T00:01:00.000Z
38.80297619047619 | 2024-05-10T00:15:00.000Z
38.802976190476194 | 2024-05-10T00:15:00.000Z
35.422222222222224 | 2024-05-10T00:04:00.000Z
34.28946078431372 | 2024-05-10T00:19:00.000Z
34.05555555555555 | 2024-05-10T00:10:00.000Z
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.SinglePhaseAggregate;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
Expand All @@ -25,7 +26,8 @@
public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPlan, PhysicalOptimizerContext> {

private static final List<RuleExecutor.Batch<PhysicalPlan>> RULES = List.of(
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns())
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()),
new Batch<>("Single aggregation", Limiter.ONCE, new SinglePhaseAggregate())
);

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.physical;

import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

/**
* Collapses two-phase aggregation into a single phase when possible.
* For example, in FROM .. | STATS first | STATS second, the STATS second aggregation
* can be executed in a single phase on the coordinator instead of two phases.
*/
public class SinglePhaseAggregate extends PhysicalOptimizerRules.OptimizerRule<AggregateExec> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work, but it's actively working against code in the mapper that already distinguishes between "happens on the coordinator, only" vs. "happens partially on data nodes", see here.

@Override
protected PhysicalPlan rule(AggregateExec plan) {
if (plan instanceof AggregateExec parent
&& parent.getMode() == AggregatorMode.FINAL
&& parent.child() instanceof AggregateExec child
&& child.getMode() == AggregatorMode.INITIAL) {
if (parent.groupings()
.stream()
.noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) {
return child.withMode(AggregatorMode.SINGLE);
}
}
return plan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ public final PhysicalOperation groupingPhysicalOperation(
List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();

// append channels to the layout
if (aggregatorMode == AggregatorMode.FINAL) {
layout.append(aggregates);
} else {
if (aggregatorMode.isOutputPartial()) {
layout.append(aggregateMapper.mapNonGrouping(aggregates));
} else {
layout.append(aggregates);
}

// create the agg factories
Expand Down Expand Up @@ -147,14 +147,14 @@ else if (aggregatorMode.isOutputPartial()) {
groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), sourceGroupAttribute, group));
}

if (aggregatorMode == AggregatorMode.FINAL) {
if (aggregatorMode.isOutputPartial()) {
layout.append(aggregateMapper.mapGrouping(aggregates));
} else {
for (var agg : aggregates) {
if (Alias.unwrap(agg) instanceof AggregateFunction) {
layout.append(agg);
}
}
} else {
layout.append(aggregateMapper.mapGrouping(aggregates));
}

// create the agg factories
Expand Down Expand Up @@ -266,7 +266,13 @@ private void aggregatesToFactory(
if (child instanceof AggregateFunction aggregateFunction) {
List<NamedExpression> sourceAttr = new ArrayList<>();

if (mode == AggregatorMode.INITIAL) {
if (mode.isInputPartial()) {
if (grouping) {
sourceAttr = aggregateMapper.mapGrouping(ne);
} else {
sourceAttr = aggregateMapper.mapNonGrouping(ne);
}
} else {
// TODO: this needs to be made more reliable - use casting to blow up when dealing with expressions (e+1)
Expression field = aggregateFunction.field();
// Only count can now support literals - all the other aggs should be optimized away
Expand Down Expand Up @@ -294,16 +300,6 @@ private void aggregatesToFactory(
}
}
}
// coordinator/exchange phase
else if (mode == AggregatorMode.FINAL || mode == AggregatorMode.INTERMEDIATE) {
if (grouping) {
sourceAttr = aggregateMapper.mapGrouping(ne);
} else {
sourceAttr = aggregateMapper.mapNonGrouping(ne);
}
} else {
throw new EsqlIllegalArgumentException("illegal aggregation mode");
}

AggregatorFunctionSupplier aggSupplier = supplier(aggregateFunction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.compute.Describable;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
Expand Down Expand Up @@ -226,7 +225,7 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
// workaround for https://github.com/elastic/elasticsearch/issues/99782
localPhysicalPlan = localPhysicalPlan.transformUp(
AggregateExec.class,
a -> a.getMode() == AggregatorMode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
a -> a.getMode().isOutputPartial() ? a : new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates()))
);
PhysicalOperation physicalOperation = plan(localPhysicalPlan, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
import static java.util.Arrays.asList;
import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL;
import static org.elasticsearch.compute.aggregation.AggregatorMode.SINGLE;
import static org.elasticsearch.core.Tuple.tuple;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
Expand Down Expand Up @@ -636,8 +637,7 @@ public void testTripleExtractorPerField() {

/**
*LimitExec[10000[INTEGER],8]
* \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],FINAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
* \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],INITIAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
* \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],SINGLE,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
* \_FilterExec[ROUND(emp_no{f}#13455) > 10[INTEGER]]
* \_TopNExec[[Order[last_name{f}#13459,ASC,LAST]],10[INTEGER],58]
* \_ExchangeExec[[emp_no{f}#13455, last_name{f}#13459, salary{f}#13460],false]
Expand All @@ -658,11 +658,10 @@ public void testExtractorForField() {

var optimized = optimizedPlan(plan);
var limit = as(optimized, LimitExec.class);
var aggregateFinal = as(limit.child(), AggregateExec.class);
assertThat(aggregateFinal.estimatedRowSize(), equalTo(Long.BYTES));
var agg = as(limit.child(), AggregateExec.class);
assertThat(agg.estimatedRowSize(), equalTo(Long.BYTES));

var aggregatePartial = as(aggregateFinal.child(), AggregateExec.class);
var filter = as(aggregatePartial.child(), FilterExec.class);
var filter = as(agg.child(), FilterExec.class);
var topN = as(filter.child(), TopNExec.class);

var exchange = asRemoteExchange(topN.child());
Expand Down Expand Up @@ -3142,8 +3141,7 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMatters() {
* Expects
*
* LimitExec[10000[INTEGER]]
* \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],FINAL,[count{r}#13, seen{r}#14],8]
* \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],INITIAL,[count{r}#13, seen{r}#14],8]
* \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],SINGLE,[count{r}#13, seen{r}#14],8]
* \_LimitExec[10[INTEGER]]
* \_ExchangeExec[[&lt;all-fields-projected&gt;{r:s}#28],false]
* \_ProjectExec[[&lt;all-fields-projected&gt;{r:s}#28]]
Expand All @@ -3158,9 +3156,9 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMattersInStats() {
"""));

var limit = as(plan, LimitExec.class);
var aggFinal = as(limit.child(), AggregateExec.class);
var aggInitial = as(aggFinal.child(), AggregateExec.class);
var limit10 = as(aggInitial.child(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(agg.getMode(), equalTo(SINGLE));
var limit10 = as(agg.child(), LimitExec.class);

var exchange = as(limit10.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);
Expand Down Expand Up @@ -3227,8 +3225,7 @@ public void testProjectAwayMvExpandColumnOrder() {
* ProjectExec[[a{r}#5]]
* \_EvalExec[[__a_SUM@81823521{r}#15 / __a_COUNT@31645621{r}#16 AS a]]
* \_LimitExec[10000[INTEGER]]
* \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],FINAL,24]
* \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],PARTIAL,16]
* \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],SINGLE,24]
* \_LimitExec[10[INTEGER]]
* \_ExchangeExec[[],false]
* \_ProjectExec[[salary{f}#11]]
Expand All @@ -3248,11 +3245,9 @@ public void testAvgSurrogateFunctionAfterRenameAndLimit() {
var limit = as(eval.child(), LimitExec.class);
assertThat(limit.limit(), instanceOf(Literal.class));
assertThat(limit.limit().fold(FoldContext.small()), equalTo(10000));
var aggFinal = as(limit.child(), AggregateExec.class);
assertThat(aggFinal.getMode(), equalTo(FINAL));
var aggPartial = as(aggFinal.child(), AggregateExec.class);
assertThat(aggPartial.getMode(), equalTo(INITIAL));
limit = as(aggPartial.child(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(agg.getMode(), equalTo(SINGLE));
limit = as(agg.child(), LimitExec.class);
assertThat(limit.limit(), instanceOf(Literal.class));
assertThat(limit.limit().fold(FoldContext.small()), equalTo(10));

Expand Down Expand Up @@ -3362,11 +3357,9 @@ public void testGlobalAggFoldingOutput() {
var optimized = optimizedPlan(plan, stats);

var limit = as(optimized, LimitExec.class);
var aggFinal = as(limit.child(), AggregateExec.class);
var aggPartial = as(aggFinal.child(), AggregateExec.class);
// The partial aggregation's output is determined via AbstractPhysicalOperationProviders.intermediateAttributes()
assertThat(Expressions.names(aggPartial.output()), contains("$$c$count", "$$c$seen"));
limit = as(aggPartial.child(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(agg.getMode(), equalTo(SINGLE));
limit = as(agg.child(), LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);
}
Expand Down Expand Up @@ -3793,8 +3786,7 @@ public void testMixedSpatialBoundsAndPointsExtracted() {
* After local optimizations we expect no changes because field is extracted:
* <code>
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],FINAL,50]
* \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],PARTIAL,50]
* \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],SINGLE,50]
* \_EvalExec[[[1 1 0 0 0 0 0 30 e2 4c 7c 45 40 0 0 e0 92 b0 82 2d 40][GEO_POINT] AS __centroid_SPATIALCENTROID@7ff910a]]
* \_RowExec[[[50 4f 49 4e 54 28 34 32 2e 39 37 31 30 39 36 32 39 39 35 38 38 36 38 20 31 34 2e 37 35 35 32 35 33 34 30 30
* 36 35 33 36 29][KEYWORD] AS wkt]]
Expand Down Expand Up @@ -3822,11 +3814,7 @@ public void testSpatialTypesAndStatsUseDocValuesNestedLiteral() {
var optimized = optimizedPlan(plan);
limit = as(optimized, LimitExec.class);
agg = as(limit.child(), AggregateExec.class);
assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
agg = as(agg.child(), AggregateExec.class);
assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
eval = as(agg.child(), EvalExec.class);
Expand Down Expand Up @@ -4097,8 +4085,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGrouped() {
* After local optimizations:
* <code>
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],FINAL,58]
* \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],PARTIAL,58]
* \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],SINGLE,58]
* \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],FINAL,58]
* \_ExchangeExec[[scalerank{f}#16, xVal{r}#19, xDel{r}#20, yVal{r}#21, yDel{r}#22, count{r}#23, count{r}#24, seen{r}#25],true]
* \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],PARTIAL,58]
Expand Down Expand Up @@ -4142,12 +4129,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGroupedAggregat
var optimized = optimizedPlan(plan);
limit = as(optimized, LimitExec.class);
agg = as(limit.child(), AggregateExec.class);
assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "count", Sum.class);
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
agg = as(agg.child(), AggregateExec.class);
assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "count", Sum.class);
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
Expand Down Expand Up @@ -7815,6 +7797,33 @@ public void testLookupJoinFieldLoadingDropAllFields() throws Exception {
assertLookupJoinFieldNames(query, data, List.of(Set.of(), Set.of("foo", "bar", "baz")));
}

/**
* LimitExec[1000[INTEGER],null]
* \_AggregateExec[[last_name{r}#8],[COUNT(first_name{r}#5,true[BOOLEAN]) AS count(first_name)#11, last_name{r}#8],SINGLE,[last_name
* {r}#8, $$count(first_name)$count{r}#25, $$count(first_name)$seen{r}#26],null]
* \_AggregateExec[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
* S last_name#8],FINAL,[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],null]
* \_ExchangeExec[[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],true]
* \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[
* Aggregate[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
* S last_name#8]]
* \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]]]
*/
public void testSingleModeAggregate() {
String q = """
FROM test
| STATS first_name = VALUES(first_name), last_name = VALUES(last_name) BY emp_no
| STATS count(first_name) BY last_name""";
PhysicalPlan plan = physicalPlan(q);
PhysicalPlan optimized = physicalPlanOptimizer.optimize(plan);
LimitExec limit = as(optimized, LimitExec.class);
AggregateExec second = as(limit.child(), AggregateExec.class);
assertThat(second.getMode(), equalTo(SINGLE));
AggregateExec first = as(second.child(), AggregateExec.class);
assertThat(first.getMode(), equalTo(FINAL));
as(first.child(), ExchangeExec.class);
}

private void assertLookupJoinFieldNames(String query, TestDataSource data, List<Set<String>> expectedFieldNames) {
assertLookupJoinFieldNames(query, data, expectedFieldNames, false);
}
Expand Down
Loading