From b0e7eb1cbda6620fca21a8b28f4c5c17e8a6dddf Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Wed, 1 Oct 2025 16:38:39 +0200 Subject: [PATCH] Move logic for SINGLE aggs into mapper, directly --- .../esql/optimizer/PhysicalPlanOptimizer.java | 4 +- .../rules/physical/SinglePhaseAggregate.java | 36 ---------------- .../xpack/esql/planner/mapper/Mapper.java | 15 ++++--- .../optimizer/PhysicalPlanOptimizerTests.java | 41 ++++++------------- 4 files changed, 24 insertions(+), 72 deletions(-) delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java index d1b642764b0e4..6d60c547f47d6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns; -import org.elasticsearch.xpack.esql.optimizer.rules.physical.SinglePhaseAggregate; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor; @@ -26,8 +25,7 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor { private static final List> RULES = List.of( - new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()), - new Batch<>("Single aggregation", Limiter.ONCE, new SinglePhaseAggregate()) + new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()) ); private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java deleted file mode 100644 index 04242e3e18590..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.optimizer.rules.physical; - -import org.elasticsearch.compute.aggregation.AggregatorMode; -import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; -import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; -import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; -import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; - -/** - * Collapses two-phase aggregation into a single phase when possible. - * For example, in FROM .. | STATS first | STATS second, the STATS second aggregation - * can be executed in a single phase on the coordinator instead of two phases. - */ -public class SinglePhaseAggregate extends PhysicalOptimizerRules.OptimizerRule { - @Override - protected PhysicalPlan rule(AggregateExec plan) { - if (plan instanceof AggregateExec parent - && parent.getMode() == AggregatorMode.FINAL - && parent.child() instanceof AggregateExec child - && child.getMode() == AggregatorMode.INITIAL) { - if (parent.groupings() - .stream() - .noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) { - return child.withMode(AggregatorMode.SINGLE); - } - } - return plan; - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index dd9b733d97872..49192983b30e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -160,12 +161,16 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { if (mappedChild instanceof ExchangeExec exchange) { mappedChild = new ExchangeExec(mappedChild.source(), intermediate, true, exchange.child()); } - // if no exchange was added (aggregation happening on the coordinator), create the initial agg - else { - mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate); - } + // if no exchange was added (aggregation happening on the coordinator), try to only create a single-pass agg + else if (aggregate.groupings() + .stream() + .noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) { + return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.SINGLE, intermediate); + } else { + mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate); + } - // always add the final/reduction agg + // The final/reduction agg return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 8ee56306c803f..281bd9dca9e35 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -3798,11 +3798,7 @@ public void testSpatialTypesAndStatsUseDocValuesNestedLiteral() { var limit = as(plan, LimitExec.class); var agg = as(limit.child(), AggregateExec.class); - assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL)); - assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); - assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); - agg = as(agg.child(), AggregateExec.class); - assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL)); + assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE)); assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); var eval = as(agg.child(), EvalExec.class); @@ -4101,12 +4097,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGroupedAggregat var limit = as(plan, LimitExec.class); var agg = as(limit.child(), AggregateExec.class); - assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL)); - assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); - assertAggregation(agg, "count", Sum.class); - assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); - agg = as(agg.child(), AggregateExec.class); - assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL)); + assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE)); assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0)); assertAggregation(agg, "count", Sum.class); assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE); @@ -6897,11 +6888,9 @@ public void testEnrichBeforeAggregation() { | ENRICH _coordinator:departments | STATS size=count(*) BY department"""); var limit = as(plan, LimitExec.class); - var finalAggs = as(limit.child(), AggregateExec.class); - assertThat(finalAggs.getMode(), equalTo(FINAL)); - var partialAggs = as(finalAggs.child(), AggregateExec.class); - assertThat(partialAggs.getMode(), equalTo(INITIAL)); - var enrich = as(partialAggs.child(), EnrichExec.class); + var aggs = as(limit.child(), AggregateExec.class); + assertThat(aggs.getMode(), equalTo(SINGLE)); + var enrich = as(aggs.child(), EnrichExec.class); assertThat(enrich.mode(), equalTo(Enrich.Mode.COORDINATOR)); assertThat(enrich.concreteIndices(), equalTo(Map.of("", ".enrich-departments-3"))); var exchange = as(enrich.child(), ExchangeExec.class); @@ -7228,9 +7217,8 @@ public void testManyEnrich() { | STATS teams=count(*) BY supervisor """); var limit = as(plan, LimitExec.class); - var finalAgg = as(limit.child(), AggregateExec.class); - var partialAgg = as(finalAgg.child(), AggregateExec.class); - var enrich1 = as(partialAgg.child(), EnrichExec.class); + var agg = as(limit.child(), AggregateExec.class); + var enrich1 = as(agg.child(), EnrichExec.class); assertThat(enrich1.policyName(), equalTo("supervisors")); assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY)); var finalTopN = as(enrich1.child(), TopNExec.class); @@ -7254,9 +7242,8 @@ public void testManyEnrich() { | STATS teams=count(*) BY supervisor """); var limit = as(plan, LimitExec.class); - var finalAgg = as(limit.child(), AggregateExec.class); - var partialAgg = as(finalAgg.child(), AggregateExec.class); - var enrich1 = as(partialAgg.child(), EnrichExec.class); + var agg = as(limit.child(), AggregateExec.class); + var enrich1 = as(agg.child(), EnrichExec.class); assertThat(enrich1.policyName(), equalTo("supervisors")); assertThat(enrich1.mode(), equalTo(Enrich.Mode.COORDINATOR)); var finalTopN = as(enrich1.child(), TopNExec.class); @@ -7280,9 +7267,8 @@ public void testManyEnrich() { | STATS teams=count(*) BY supervisor """); var limit = as(plan, LimitExec.class); - var finalAgg = as(limit.child(), AggregateExec.class); - var partialAgg = as(finalAgg.child(), AggregateExec.class); - var enrich1 = as(partialAgg.child(), EnrichExec.class); + var agg = as(limit.child(), AggregateExec.class); + var enrich1 = as(agg.child(), EnrichExec.class); assertThat(enrich1.policyName(), equalTo("supervisors")); assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY)); var topN = as(enrich1.child(), TopNExec.class); @@ -7305,9 +7291,8 @@ public void testManyEnrich() { | STATS teams=count(*) BY supervisor """); var limit = as(plan, LimitExec.class); - var finalAgg = as(limit.child(), AggregateExec.class); - var partialAgg = as(finalAgg.child(), AggregateExec.class); - var enrich1 = as(partialAgg.child(), EnrichExec.class); + var agg = as(limit.child(), AggregateExec.class); + var enrich1 = as(agg.child(), EnrichExec.class); assertThat(enrich1.policyName(), equalTo("supervisors")); assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY)); var topN = as(enrich1.child(), TopNExec.class);