From 0c3c78debc3970017611d931bd9bd04143825dbb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 17 Jul 2025 15:23:31 -0700 Subject: [PATCH 1/4] Run single phase aggregation when possible --- .../esql/optimizer/PhysicalPlanOptimizer.java | 4 +- .../rules/physical/SinglePhaseAggregate.java | 36 ++++++++++++++++++ .../AbstractPhysicalOperationProviders.java | 30 +++++++-------- .../esql/planner/LocalExecutionPlanner.java | 3 +- .../optimizer/PhysicalPlanOptimizerTests.java | 37 +++++++++++++++---- 5 files changed, 83 insertions(+), 27 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java index 6d60c547f47d6..d1b642764b0e4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.SinglePhaseAggregate; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor; @@ -25,7 +26,8 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor { private static final List> RULES = List.of( - new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()) + new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()), + new Batch<>("Single aggregation", Limiter.ONCE, new SinglePhaseAggregate()) ); private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java new file mode 100644 index 0000000000000..04242e3e18590 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical; + +import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; +import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; + +/** + * Collapses two-phase aggregation into a single phase when possible. + * For example, in FROM .. | STATS first | STATS second, the STATS second aggregation + * can be executed in a single phase on the coordinator instead of two phases. + */ +public class SinglePhaseAggregate extends PhysicalOptimizerRules.OptimizerRule { + @Override + protected PhysicalPlan rule(AggregateExec plan) { + if (plan instanceof AggregateExec parent + && parent.getMode() == AggregatorMode.FINAL + && parent.child() instanceof AggregateExec child + && child.getMode() == AggregatorMode.INITIAL) { + if (parent.groupings() + .stream() + .noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) { + return child.withMode(AggregatorMode.SINGLE); + } + } + return plan; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index dd7b4c6bf2f49..50a8deb23130b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -75,10 +75,10 @@ public final PhysicalOperation groupingPhysicalOperation( List aggregatorFactories = new ArrayList<>(); // append channels to the layout - if (aggregatorMode == AggregatorMode.FINAL) { - layout.append(aggregates); - } else { + if (aggregatorMode.isOutputPartial()) { layout.append(aggregateMapper.mapNonGrouping(aggregates)); + } else { + layout.append(aggregates); } // create the agg factories @@ -147,14 +147,14 @@ else if (aggregatorMode.isOutputPartial()) { groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), sourceGroupAttribute, group)); } - if (aggregatorMode == AggregatorMode.FINAL) { + if (aggregatorMode.isOutputPartial()) { + layout.append(aggregateMapper.mapGrouping(aggregates)); + } else { for (var agg : aggregates) { if (Alias.unwrap(agg) instanceof AggregateFunction) { layout.append(agg); } } - } else { - layout.append(aggregateMapper.mapGrouping(aggregates)); } // create the agg factories @@ -266,7 +266,13 @@ private void aggregatesToFactory( if (child instanceof AggregateFunction aggregateFunction) { List sourceAttr = new ArrayList<>(); - if (mode == AggregatorMode.INITIAL) { + if (mode.isInputPartial()) { + if (grouping) { + sourceAttr = aggregateMapper.mapGrouping(ne); + } else { + sourceAttr = aggregateMapper.mapNonGrouping(ne); + } + } else { // TODO: this needs to be made more reliable - use casting to blow up when dealing with expressions (e+1) Expression field = aggregateFunction.field(); // Only count can now support literals - all the other aggs should be optimized away @@ -294,16 +300,6 @@ private void aggregatesToFactory( } } } - // coordinator/exchange phase - else if (mode == AggregatorMode.FINAL || mode == AggregatorMode.INTERMEDIATE) { - if (grouping) { - sourceAttr = aggregateMapper.mapGrouping(ne); - } else { - sourceAttr = aggregateMapper.mapNonGrouping(ne); - } - } else { - throw new EsqlIllegalArgumentException("illegal aggregation mode"); - } AggregatorFunctionSupplier aggSupplier = supplier(aggregateFunction); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 532a5cbb5e64a..7778ef601eeb3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Maps; import org.elasticsearch.compute.Describable; -import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; @@ -226,7 +225,7 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical // workaround for https://github.com/elastic/elasticsearch/issues/99782 localPhysicalPlan = localPhysicalPlan.transformUp( AggregateExec.class, - a -> a.getMode() == AggregatorMode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a + a -> a.getMode().isOutputPartial() ? a : new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) ); PhysicalOperation physicalOperation = plan(localPhysicalPlan, context); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index a9c938e6b58d2..2a8eea9d0b666 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -164,6 +164,7 @@ import static java.util.Arrays.asList; import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL; import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL; +import static org.elasticsearch.compute.aggregation.AggregatorMode.SINGLE; import static org.elasticsearch.core.Tuple.tuple; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.existsQuery; @@ -3793,8 +3794,7 @@ public void testMixedSpatialBoundsAndPointsExtracted() { * After local optimizations we expect no changes because field is extracted: * * LimitExec[1000[INTEGER]] - * \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],FINAL,50] - * \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],PARTIAL,50] + * \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],SINGLE,50] * \_EvalExec[[[1 1 0 0 0 0 0 30 e2 4c 7c 45 40 0 0 e0 92 b0 82 2d 40][GEO_POINT] AS __centroid_SPATIALCENTROID@7ff910a]] * \_RowExec[[[50 4f 49 4e 54 28 34 32 2e 39 37 31 30 39 36 32 39 39 35 38 38 36 38 20 31 34 2e 37 35 35 32 35 33 34 30 30 * 36 35 33 36 29][KEYWORD] AS wkt]] @@ -3822,11 +3822,7 @@ public void testSpatialTypesAndStatsUseDocValuesNestedLiteral() { var optimized = optimizedPlan(plan); limit = as(optimized, LimitExec.class); agg = as(limit.child(), AggregateExec.class); - assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL)); - assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); - assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); - agg = as(agg.child(), AggregateExec.class); - assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL)); + assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE)); assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); eval = as(agg.child(), EvalExec.class); @@ -7815,6 +7811,33 @@ public void testLookupJoinFieldLoadingDropAllFields() throws Exception { assertLookupJoinFieldNames(query, data, List.of(Set.of(), Set.of("foo", "bar", "baz"))); } + /** + * LimitExec[1000[INTEGER],null] + * \_AggregateExec[[last_name{r}#8],[COUNT(first_name{r}#5,true[BOOLEAN]) AS count(first_name)#11, last_name{r}#8],SINGLE,[last_name + * {r}#8, $$count(first_name)$count{r}#25, $$count(first_name)$seen{r}#26],null] + * \_AggregateExec[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A + * S last_name#8],FINAL,[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],null] + * \_ExchangeExec[[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],true] + * \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[ + * Aggregate[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A + * S last_name#8]] + * \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]]] + */ + public void testSingleModeAggregate() { + String q = """ + FROM test + | STATS first_name = VALUES(first_name), last_name = VALUES(last_name) BY emp_no + | STATS count(first_name) BY last_name"""; + PhysicalPlan plan = physicalPlan(q); + PhysicalPlan optimized = physicalPlanOptimizer.optimize(plan); + LimitExec limit = as(optimized, LimitExec.class); + AggregateExec second = as(limit.child(), AggregateExec.class); + assertThat(second.getMode(), equalTo(SINGLE)); + AggregateExec first = as(second.child(), AggregateExec.class); + assertThat(first.getMode(), equalTo(FINAL)); + as(first.child(), ExchangeExec.class); + } + private void assertLookupJoinFieldNames(String query, TestDataSource data, List> expectedFieldNames) { assertLookupJoinFieldNames(query, data, expectedFieldNames, false); } From 105fe9d76e7efdc286280cbdf02477efe83c6ecb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Sep 2025 12:31:35 -0700 Subject: [PATCH 2/4] more fix --- .../optimizer/PhysicalPlanOptimizerTests.java | 48 +++++++------------ .../ReplaceRoundToWithQueryAndTagsTests.java | 11 +---- 2 files changed, 19 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 2a8eea9d0b666..b4f66f44b86d0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -637,8 +637,7 @@ public void testTripleExtractorPerField() { /** *LimitExec[10000[INTEGER],8] - * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],FINAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8] - * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],INITIAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8] + * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],SINGLE,[$$x$sum{r}#13466, $$x$seen{r}#13467],8] * \_FilterExec[ROUND(emp_no{f}#13455) > 10[INTEGER]] * \_TopNExec[[Order[last_name{f}#13459,ASC,LAST]],10[INTEGER],58] * \_ExchangeExec[[emp_no{f}#13455, last_name{f}#13459, salary{f}#13460],false] @@ -659,11 +658,10 @@ public void testExtractorForField() { var optimized = optimizedPlan(plan); var limit = as(optimized, LimitExec.class); - var aggregateFinal = as(limit.child(), AggregateExec.class); - assertThat(aggregateFinal.estimatedRowSize(), equalTo(Long.BYTES)); + var agg = as(limit.child(), AggregateExec.class); + assertThat(agg.estimatedRowSize(), equalTo(Long.BYTES)); - var aggregatePartial = as(aggregateFinal.child(), AggregateExec.class); - var filter = as(aggregatePartial.child(), FilterExec.class); + var filter = as(agg.child(), FilterExec.class); var topN = as(filter.child(), TopNExec.class); var exchange = asRemoteExchange(topN.child()); @@ -3143,8 +3141,7 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMatters() { * Expects * * LimitExec[10000[INTEGER]] - * \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],FINAL,[count{r}#13, seen{r}#14],8] - * \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],INITIAL,[count{r}#13, seen{r}#14],8] + * \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],SINGLE,[count{r}#13, seen{r}#14],8] * \_LimitExec[10[INTEGER]] * \_ExchangeExec[[<all-fields-projected>{r:s}#28],false] * \_ProjectExec[[<all-fields-projected>{r:s}#28]] @@ -3159,9 +3156,9 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMattersInStats() { """)); var limit = as(plan, LimitExec.class); - var aggFinal = as(limit.child(), AggregateExec.class); - var aggInitial = as(aggFinal.child(), AggregateExec.class); - var limit10 = as(aggInitial.child(), LimitExec.class); + var agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), equalTo(SINGLE)); + var limit10 = as(agg.child(), LimitExec.class); var exchange = as(limit10.child(), ExchangeExec.class); var project = as(exchange.child(), ProjectExec.class); @@ -3228,8 +3225,7 @@ public void testProjectAwayMvExpandColumnOrder() { * ProjectExec[[a{r}#5]] * \_EvalExec[[__a_SUM@81823521{r}#15 / __a_COUNT@31645621{r}#16 AS a]] * \_LimitExec[10000[INTEGER]] - * \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],FINAL,24] - * \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],PARTIAL,16] + * \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],SINGLE,24] * \_LimitExec[10[INTEGER]] * \_ExchangeExec[[],false] * \_ProjectExec[[salary{f}#11]] @@ -3249,11 +3245,9 @@ public void testAvgSurrogateFunctionAfterRenameAndLimit() { var limit = as(eval.child(), LimitExec.class); assertThat(limit.limit(), instanceOf(Literal.class)); assertThat(limit.limit().fold(FoldContext.small()), equalTo(10000)); - var aggFinal = as(limit.child(), AggregateExec.class); - assertThat(aggFinal.getMode(), equalTo(FINAL)); - var aggPartial = as(aggFinal.child(), AggregateExec.class); - assertThat(aggPartial.getMode(), equalTo(INITIAL)); - limit = as(aggPartial.child(), LimitExec.class); + var agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), equalTo(SINGLE)); + limit = as(agg.child(), LimitExec.class); assertThat(limit.limit(), instanceOf(Literal.class)); assertThat(limit.limit().fold(FoldContext.small()), equalTo(10)); @@ -3363,11 +3357,9 @@ public void testGlobalAggFoldingOutput() { var optimized = optimizedPlan(plan, stats); var limit = as(optimized, LimitExec.class); - var aggFinal = as(limit.child(), AggregateExec.class); - var aggPartial = as(aggFinal.child(), AggregateExec.class); - // The partial aggregation's output is determined via AbstractPhysicalOperationProviders.intermediateAttributes() - assertThat(Expressions.names(aggPartial.output()), contains("$$c$count", "$$c$seen")); - limit = as(aggPartial.child(), LimitExec.class); + var agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), equalTo(SINGLE)); + limit = as(agg.child(), LimitExec.class); var exchange = as(limit.child(), ExchangeExec.class); var project = as(exchange.child(), ProjectExec.class); } @@ -4093,8 +4085,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGrouped() { * After local optimizations: * * LimitExec[1000[INTEGER]] - * \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],FINAL,58] - * \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],PARTIAL,58] + * \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],SINGLE,58] * \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],FINAL,58] * \_ExchangeExec[[scalerank{f}#16, xVal{r}#19, xDel{r}#20, yVal{r}#21, yDel{r}#22, count{r}#23, count{r}#24, seen{r}#25],true] * \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],PARTIAL,58] @@ -4138,12 +4129,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGroupedAggregat var optimized = optimizedPlan(plan); limit = as(optimized, LimitExec.class); agg = as(limit.child(), AggregateExec.class); - assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL)); - assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); - assertAggregation(agg, "count", Sum.class); - assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); - agg = as(agg.child(), AggregateExec.class); - assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL)); + assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE)); assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); assertAggregation(agg, "count", Sum.class); assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java index beef7ee4857a3..14ddb647946bb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java @@ -55,7 +55,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL; -import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL; +import static org.elasticsearch.compute.aggregation.AggregatorMode.SINGLE; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.existsQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; @@ -379,19 +379,12 @@ public void testDateTruncBucketNotTransformToQueryAndTagsWithFork() { LimitExec limit = as(plan, LimitExec.class); AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); + assertThat(agg.getMode(), is(SINGLE)); List groupings = agg.groupings(); NamedExpression grouping = as(groupings.get(0), NamedExpression.class); assertEquals("x", grouping.name()); assertEquals(DataType.DATETIME, grouping.dataType()); assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - agg = as(agg.child(), AggregateExec.class); - assertThat(agg.getMode(), is(INITIAL)); - groupings = agg.groupings(); - grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(DataType.DATETIME, grouping.dataType()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); EvalExec eval = as(agg.child(), EvalExec.class); List aliases = eval.fields(); assertEquals(1, aliases.size()); From 5fa734cb9f333b07987a2b019472d58a3ce55950 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Sep 2025 12:57:28 -0700 Subject: [PATCH 3/4] Update docs/changelog/131485.yaml --- docs/changelog/131485.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/131485.yaml diff --git a/docs/changelog/131485.yaml b/docs/changelog/131485.yaml new file mode 100644 index 0000000000000..686c97d40d078 --- /dev/null +++ b/docs/changelog/131485.yaml @@ -0,0 +1,5 @@ +pr: 131485 +summary: Run single phase aggregation when possible +area: ES|QL +type: enhancement +issues: [] From f5428d8eccef0db1933a38e6cb5d78d85378bd98 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Sep 2025 15:45:45 -0700 Subject: [PATCH 4/4] fix tests --- .../src/main/resources/k8s-timeseries-rate.csv-spec | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-rate.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-rate.csv-spec index c1f8302c28fbf..53df5f83bc97d 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-rate.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-rate.csv-spec @@ -6,7 +6,7 @@ TS k8s rate_bytes_in:double | time_bucket:datetime null | 2024-05-10T00:01:00.000Z -28.80297619047619 | 2024-05-10T00:15:00.000Z +28.802976190476194 | 2024-05-10T00:15:00.000Z 25.422222222222224 | 2024-05-10T00:04:00.000Z 24.28946078431372 | 2024-05-10T00:19:00.000Z 24.05555555555555 | 2024-05-10T00:10:00.000Z @@ -136,7 +136,7 @@ TS k8s rate_bytes_in:double | time_bucket:datetime null | 2024-05-10T00:01:00.000Z -38.80297619047619 | 2024-05-10T00:15:00.000Z +38.802976190476194 | 2024-05-10T00:15:00.000Z 35.422222222222224 | 2024-05-10T00:04:00.000Z 34.28946078431372 | 2024-05-10T00:19:00.000Z 34.05555555555555 | 2024-05-10T00:10:00.000Z