Skip to content

Commit 22e6ba1

Browse files
authored
ESQL: Simplify agg construction when reducing on node-level (elastic#128980) (elastic#129050)
(cherry picked from commit 110bbe8) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java
1 parent e698178 commit 22e6ba1

File tree

2 files changed

+1
-10
lines changed

2 files changed

+1
-10
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
3434
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
3535
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
36-
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
3736
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
3837
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
3938

@@ -70,14 +69,6 @@ public final PhysicalOperation groupingPhysicalOperation(
7069

7170
var sourceLayout = source.layout;
7271

73-
if (aggregatorMode != AggregatorMode.INITIAL && aggregatorMode != AggregatorMode.FINAL) {
74-
assert false : "Invalid aggregator mode [" + aggregatorMode + "]";
75-
}
76-
if (aggregatorMode == AggregatorMode.INITIAL && aggregateExec.child() instanceof ExchangeSourceExec) {
77-
// the reducer step at data node (local) level
78-
aggregatorMode = AggregatorMode.INTERMEDIATE;
79-
}
80-
8172
if (aggregateExec.groupings().isEmpty()) {
8273
// not grouping
8374
List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
9999
final LocalMapper mapper = new LocalMapper();
100100
PhysicalPlan reducePlan = mapper.map(pipelineBreaker);
101101
if (reducePlan instanceof AggregateExec agg) {
102-
reducePlan = agg.withMode(AggregatorMode.INITIAL); // force to emit intermediate outputs
102+
reducePlan = agg.withMode(AggregatorMode.INTERMEDIATE);
103103
}
104104
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);
105105
}

0 commit comments

Comments
 (0)