164164import static java .util .Arrays .asList ;
165165import static org .elasticsearch .compute .aggregation .AggregatorMode .FINAL ;
166166import static org .elasticsearch .compute .aggregation .AggregatorMode .INITIAL ;
167+ import static org .elasticsearch .compute .aggregation .AggregatorMode .SINGLE ;
167168import static org .elasticsearch .core .Tuple .tuple ;
168169import static org .elasticsearch .index .query .QueryBuilders .boolQuery ;
169170import static org .elasticsearch .index .query .QueryBuilders .existsQuery ;
@@ -636,8 +637,7 @@ public void testTripleExtractorPerField() {
636637
637638 /**
638639 *LimitExec[10000[INTEGER],8]
639- * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],FINAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
640- * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],INITIAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
640+ * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],SINGLE,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
641641 * \_FilterExec[ROUND(emp_no{f}#13455) > 10[INTEGER]]
642642 * \_TopNExec[[Order[last_name{f}#13459,ASC,LAST]],10[INTEGER],58]
643643 * \_ExchangeExec[[emp_no{f}#13455, last_name{f}#13459, salary{f}#13460],false]
@@ -658,11 +658,10 @@ public void testExtractorForField() {
658658
659659 var optimized = optimizedPlan (plan );
660660 var limit = as (optimized , LimitExec .class );
661- var aggregateFinal = as (limit .child (), AggregateExec .class );
662- assertThat (aggregateFinal .estimatedRowSize (), equalTo (Long .BYTES ));
661+ var agg = as (limit .child (), AggregateExec .class );
662+ assertThat (agg .estimatedRowSize (), equalTo (Long .BYTES ));
663663
664- var aggregatePartial = as (aggregateFinal .child (), AggregateExec .class );
665- var filter = as (aggregatePartial .child (), FilterExec .class );
664+ var filter = as (agg .child (), FilterExec .class );
666665 var topN = as (filter .child (), TopNExec .class );
667666
668667 var exchange = asRemoteExchange (topN .child ());
@@ -3140,8 +3139,7 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMatters() {
31403139 * Expects
31413140 *
31423141 * LimitExec[10000[INTEGER]]
3143- * \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],FINAL,[count{r}#13, seen{r}#14],8]
3144- * \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],INITIAL,[count{r}#13, seen{r}#14],8]
3142+ * \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS count(*)],SINGLE,[count{r}#13, seen{r}#14],8]
31453143 * \_LimitExec[10[INTEGER]]
31463144 * \_ExchangeExec[[<all-fields-projected>{r:s}#28],false]
31473145 * \_ProjectExec[[<all-fields-projected>{r:s}#28]]
@@ -3156,9 +3154,9 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMattersInStats() {
31563154 """ ));
31573155
31583156 var limit = as (plan , LimitExec .class );
3159- var aggFinal = as (limit .child (), AggregateExec .class );
3160- var aggInitial = as ( aggFinal . child (), AggregateExec . class );
3161- var limit10 = as (aggInitial .child (), LimitExec .class );
3157+ var agg = as (limit .child (), AggregateExec .class );
3158+ assertThat ( agg . getMode (), equalTo ( SINGLE ) );
3159+ var limit10 = as (agg .child (), LimitExec .class );
31623160
31633161 var exchange = as (limit10 .child (), ExchangeExec .class );
31643162 var project = as (exchange .child (), ProjectExec .class );
@@ -3225,8 +3223,7 @@ public void testProjectAwayMvExpandColumnOrder() {
32253223 * ProjectExec[[a{r}#5]]
32263224 * \_EvalExec[[__a_SUM@81823521{r}#15 / __a_COUNT@31645621{r}#16 AS a]]
32273225 * \_LimitExec[10000[INTEGER]]
3228- * \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],FINAL,24]
3229- * \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],PARTIAL,16]
3226+ * \_AggregateExec[[],[SUM(salary{f}#11) AS __a_SUM@81823521, COUNT(salary{f}#11) AS __a_COUNT@31645621],SINGLE,24]
32303227 * \_LimitExec[10[INTEGER]]
32313228 * \_ExchangeExec[[],false]
32323229 * \_ProjectExec[[salary{f}#11]]
@@ -3246,11 +3243,9 @@ public void testAvgSurrogateFunctionAfterRenameAndLimit() {
32463243 var limit = as (eval .child (), LimitExec .class );
32473244 assertThat (limit .limit (), instanceOf (Literal .class ));
32483245 assertThat (limit .limit ().fold (FoldContext .small ()), equalTo (10000 ));
3249- var aggFinal = as (limit .child (), AggregateExec .class );
3250- assertThat (aggFinal .getMode (), equalTo (FINAL ));
3251- var aggPartial = as (aggFinal .child (), AggregateExec .class );
3252- assertThat (aggPartial .getMode (), equalTo (INITIAL ));
3253- limit = as (aggPartial .child (), LimitExec .class );
3246+ var agg = as (limit .child (), AggregateExec .class );
3247+ assertThat (agg .getMode (), equalTo (SINGLE ));
3248+ limit = as (agg .child (), LimitExec .class );
32543249 assertThat (limit .limit (), instanceOf (Literal .class ));
32553250 assertThat (limit .limit ().fold (FoldContext .small ()), equalTo (10 ));
32563251
@@ -3360,11 +3355,9 @@ public void testGlobalAggFoldingOutput() {
33603355 var optimized = optimizedPlan (plan , stats );
33613356
33623357 var limit = as (optimized , LimitExec .class );
3363- var aggFinal = as (limit .child (), AggregateExec .class );
3364- var aggPartial = as (aggFinal .child (), AggregateExec .class );
3365- // The partial aggregation's output is determined via AbstractPhysicalOperationProviders.intermediateAttributes()
3366- assertThat (Expressions .names (aggPartial .output ()), contains ("$$c$count" , "$$c$seen" ));
3367- limit = as (aggPartial .child (), LimitExec .class );
3358+ var agg = as (limit .child (), AggregateExec .class );
3359+ assertThat (agg .getMode (), equalTo (SINGLE ));
3360+ limit = as (agg .child (), LimitExec .class );
33683361 var exchange = as (limit .child (), ExchangeExec .class );
33693362 var project = as (exchange .child (), ProjectExec .class );
33703363 }
@@ -3791,8 +3784,7 @@ public void testMixedSpatialBoundsAndPointsExtracted() {
37913784 * After local optimizations we expect no changes because field is extracted:
37923785 * <code>
37933786 * LimitExec[1000[INTEGER]]
3794- * \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],FINAL,50]
3795- * \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],PARTIAL,50]
3787+ * \_AggregateExec[[],[SPATIALCENTROID(__centroid_SPATIALCENTROID@7ff910a{r}#7) AS centroid],SINGLE,50]
37963788 * \_EvalExec[[[1 1 0 0 0 0 0 30 e2 4c 7c 45 40 0 0 e0 92 b0 82 2d 40][GEO_POINT] AS __centroid_SPATIALCENTROID@7ff910a]]
37973789 * \_RowExec[[[50 4f 49 4e 54 28 34 32 2e 39 37 31 30 39 36 32 39 39 35 38 38 36 38 20 31 34 2e 37 35 35 32 35 33 34 30 30
37983790 * 36 35 33 36 29][KEYWORD] AS wkt]]
@@ -3820,11 +3812,7 @@ public void testSpatialTypesAndStatsUseDocValuesNestedLiteral() {
38203812 var optimized = optimizedPlan (plan );
38213813 limit = as (optimized , LimitExec .class );
38223814 agg = as (limit .child (), AggregateExec .class );
3823- assertThat ("Aggregation is FINAL" , agg .getMode (), equalTo (FINAL ));
3824- assertThat ("No groupings in aggregation" , agg .groupings ().size (), equalTo (0 ));
3825- assertAggregation (agg , "centroid" , SpatialCentroid .class , GEO_POINT , FieldExtractPreference .NONE );
3826- agg = as (agg .child (), AggregateExec .class );
3827- assertThat ("Aggregation is PARTIAL" , agg .getMode (), equalTo (INITIAL ));
3815+ assertThat ("Aggregation is SINGLE" , agg .getMode (), equalTo (SINGLE ));
38283816 assertThat ("No groupings in aggregation" , agg .groupings ().size (), equalTo (0 ));
38293817 assertAggregation (agg , "centroid" , SpatialCentroid .class , GEO_POINT , FieldExtractPreference .NONE );
38303818 eval = as (agg .child (), EvalExec .class );
@@ -4095,8 +4083,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGrouped() {
40954083 * After local optimizations:
40964084 * <code>
40974085 * LimitExec[1000[INTEGER]]
4098- * \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],FINAL,58]
4099- * \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],PARTIAL,58]
4086+ * \_AggregateExec[[],[SPATIALCENTROID(centroid{r}#4) AS centroid, SUM(count{r}#6) AS count],SINGLE,58]
41004087 * \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],FINAL,58]
41014088 * \_ExchangeExec[[scalerank{f}#16, xVal{r}#19, xDel{r}#20, yVal{r}#21, yDel{r}#22, count{r}#23, count{r}#24, seen{r}#25],true]
41024089 * \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],PARTIAL,58]
@@ -4140,12 +4127,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGroupedAggregat
41404127 var optimized = optimizedPlan (plan );
41414128 limit = as (optimized , LimitExec .class );
41424129 agg = as (limit .child (), AggregateExec .class );
4143- assertThat ("Aggregation is FINAL" , agg .getMode (), equalTo (FINAL ));
4144- assertThat ("No groupings in aggregation" , agg .groupings ().size (), equalTo (0 ));
4145- assertAggregation (agg , "count" , Sum .class );
4146- assertAggregation (agg , "centroid" , SpatialCentroid .class , GEO_POINT , FieldExtractPreference .NONE );
4147- agg = as (agg .child (), AggregateExec .class );
4148- assertThat ("Aggregation is PARTIAL" , agg .getMode (), equalTo (INITIAL ));
4130+ assertThat ("Aggregation is SINGLE" , agg .getMode (), equalTo (SINGLE ));
41494131 assertThat ("No groupings in aggregation" , agg .groupings ().size (), equalTo (0 ));
41504132 assertAggregation (agg , "count" , Sum .class );
41514133 assertAggregation (agg , "centroid" , SpatialCentroid .class , GEO_POINT , FieldExtractPreference .NONE );
@@ -7805,6 +7787,33 @@ public void testLookupJoinFieldLoadingDropAllFields() throws Exception {
78057787 assertLookupJoinFieldNames (query , data , List .of (Set .of (), Set .of ("foo" , "bar" , "baz" )));
78067788 }
78077789
7790+ /**
7791+ * LimitExec[1000[INTEGER],null]
7792+ * \_AggregateExec[[last_name{r}#8],[COUNT(first_name{r}#5,true[BOOLEAN]) AS count(first_name)#11, last_name{r}#8],SINGLE,[last_name
7793+ * {r}#8, $$count(first_name)$count{r}#25, $$count(first_name)$seen{r}#26],null]
7794+ * \_AggregateExec[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
7795+ * S last_name#8],FINAL,[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],null]
7796+ * \_ExchangeExec[[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],true]
7797+ * \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[
7798+ * Aggregate[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
7799+ * S last_name#8]]
7800+ * \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]]]
7801+ */
7802+ public void testSingleModeAggregate () {
7803+ String q = """
7804+ FROM test
7805+ | STATS first_name = VALUES(first_name), last_name = VALUES(last_name) BY emp_no
7806+ | STATS count(first_name) BY last_name""" ;
7807+ PhysicalPlan plan = physicalPlan (q );
7808+ PhysicalPlan optimized = physicalPlanOptimizer .optimize (plan );
7809+ LimitExec limit = as (optimized , LimitExec .class );
7810+ AggregateExec second = as (limit .child (), AggregateExec .class );
7811+ assertThat (second .getMode (), equalTo (SINGLE ));
7812+ AggregateExec first = as (second .child (), AggregateExec .class );
7813+ assertThat (first .getMode (), equalTo (FINAL ));
7814+ as (first .child (), ExchangeExec .class );
7815+ }
7816+
78087817 private void assertLookupJoinFieldNames (String query , TestDataSource data , List <Set <String >> expectedFieldNames ) {
78097818 assertLookupJoinFieldNames (query , data , expectedFieldNames , false );
78107819 }
0 commit comments