2222import org .elasticsearch .compute .data .IntBlock ;
2323import org .elasticsearch .compute .data .IntVector ;
2424import org .elasticsearch .compute .data .Page ;
25+ import org .elasticsearch .core .Releasable ;
2526import org .elasticsearch .core .Releasables ;
2627import org .elasticsearch .core .TimeValue ;
2728import org .elasticsearch .index .analysis .AnalysisRegistry ;
@@ -58,12 +59,14 @@ public Operator get(DriverContext driverContext) {
5859 analysisRegistry ,
5960 maxPageSize
6061 ),
62+ maxPageSize ,
6163 driverContext
6264 );
6365 }
6466 return new HashAggregationOperator (
6567 aggregators ,
6668 () -> BlockHash .build (groups , driverContext .blockFactory (), maxPageSize , false ),
69+ maxPageSize ,
6770 driverContext
6871 );
6972 }
@@ -78,9 +81,10 @@ public String describe() {
7881 }
7982 }
8083
81- private boolean finished ;
82- private Page output ;
84+ private final int maxPageSize ;
85+ private Emitter emitter ;
8386
87+ private boolean blockHashClosed = false ;
8488 private final BlockHash blockHash ;
8589
8690 private final List <GroupingAggregator > aggregators ;
@@ -96,16 +100,23 @@ public String describe() {
96100 */
97101 private long aggregationNanos ;
98102 /**
99- * Count of pages this operator has processed.
103+ * Count of input pages this operator has processed.
100104 */
101105 private int pagesProcessed ;
102106
107+ /**
108+ * Count of output pages this operator has emitted
109+ */
110+ private int pagesEmitted ;
111+
103112 @ SuppressWarnings ("this-escape" )
104113 public HashAggregationOperator (
105114 List <GroupingAggregator .Factory > aggregators ,
106115 Supplier <BlockHash > blockHash ,
116+ int maxPageSize ,
107117 DriverContext driverContext
108118 ) {
119+ this .maxPageSize = maxPageSize ;
109120 this .aggregators = new ArrayList <>(aggregators .size ());
110121 this .driverContext = driverContext ;
111122 boolean success = false ;
@@ -124,7 +135,7 @@ public HashAggregationOperator(
124135
125136 @ Override
126137 public boolean needsInput () {
127- return finished == false ;
138+ return emitter == null ;
128139 }
129140
130141 @ Override
@@ -192,61 +203,102 @@ public void close() {
192203
193204 @ Override
194205 public Page getOutput () {
195- Page p = output ;
196- output = null ;
197- return p ;
206+ if (emitter == null ) {
207+ return null ;
208+ }
209+ return emitter .nextPage ();
198210 }
199211
200- @ Override
201- public void finish () {
202- if (finished ) {
203- return ;
212+ private class Emitter implements Releasable {
213+ private final int [] aggBlockCounts ;
214+ private int position = -1 ;
215+ private IntVector allSelected = null ;
216+ private Block [] allKeys ;
217+
218+ Emitter (int [] aggBlockCounts ) {
219+ this .aggBlockCounts = aggBlockCounts ;
204220 }
205- finished = true ;
206- Block [] blocks = null ;
207- IntVector selected = null ;
208- boolean success = false ;
209- try {
210- selected = blockHash .nonEmpty ();
211- Block [] keys = blockHash .getKeys ();
212- int [] aggBlockCounts = aggregators .stream ().mapToInt (GroupingAggregator ::evaluateBlockCount ).toArray ();
213- blocks = new Block [keys .length + Arrays .stream (aggBlockCounts ).sum ()];
214- System .arraycopy (keys , 0 , blocks , 0 , keys .length );
215- int offset = keys .length ;
216- for (int i = 0 ; i < aggregators .size (); i ++) {
217- var aggregator = aggregators .get (i );
218- aggregator .evaluate (blocks , offset , selected , driverContext );
219- offset += aggBlockCounts [i ];
221+
222+ Page nextPage () {
223+ if (position == -1 ) {
224+ position = 0 ;
225+ // TODO: chunk selected and keys
226+ allKeys = blockHash .getKeys ();
227+ allSelected = blockHash .nonEmpty ();
228+ blockHashClosed = true ;
229+ blockHash .close ();
220230 }
221- output = new Page (blocks );
222- success = true ;
223- } finally {
224- // selected should always be closed
225- if (selected != null ) {
226- selected .close ();
231+ final int endPosition = Math .toIntExact (Math .min (position + (long ) maxPageSize , allSelected .getPositionCount ()));
232+ if (endPosition == position ) {
233+ return null ;
227234 }
228- if (success == false && blocks != null ) {
229- Releasables .closeExpectNoException (blocks );
235+ final boolean singlePage = position == 0 && endPosition == allSelected .getPositionCount ();
236+ final Block [] blocks = new Block [allKeys .length + Arrays .stream (aggBlockCounts ).sum ()];
237+ IntVector selected = null ;
238+ boolean success = false ;
239+ try {
240+ if (singlePage ) {
241+ this .allSelected .incRef ();
242+ selected = this .allSelected ;
243+ for (int i = 0 ; i < allKeys .length ; i ++) {
244+ allKeys [i ].incRef ();
245+ blocks [i ] = allKeys [i ];
246+ }
247+ } else {
248+ final int [] positions = new int [endPosition - position ];
249+ for (int i = 0 ; i < positions .length ; i ++) {
250+ positions [i ] = position + i ;
251+ }
252+ selected = allSelected .filter (positions );
253+ for (int keyIndex = 0 ; keyIndex < allKeys .length ; keyIndex ++) {
254+ blocks [keyIndex ] = allKeys [keyIndex ].filter (positions );
255+ }
256+ }
257+ int blockOffset = allKeys .length ;
258+ for (int i = 0 ; i < aggregators .size (); i ++) {
259+ aggregators .get (i ).evaluate (blocks , blockOffset , selected , driverContext );
260+ blockOffset += aggBlockCounts [i ];
261+ }
262+ var output = new Page (blocks );
263+ pagesEmitted ++;
264+ success = true ;
265+ return output ;
266+ } finally {
267+ position = endPosition ;
268+ Releasables .close (selected , success ? null : Releasables .wrap (blocks ));
230269 }
231270 }
271+
272+ @ Override
273+ public void close () {
274+ Releasables .close (allSelected , allKeys != null ? Releasables .wrap (allKeys ) : null );
275+ }
276+
277+ boolean doneEmitting () {
278+ return allSelected != null && position >= allSelected .getPositionCount ();
279+ }
280+ }
281+
282+ @ Override
283+ public void finish () {
284+ if (emitter == null ) {
285+ emitter = new Emitter (aggregators .stream ().mapToInt (GroupingAggregator ::evaluateBlockCount ).toArray ());
286+ }
232287 }
233288
234289 @ Override
235290 public boolean isFinished () {
236- return finished && output == null ;
291+ return emitter != null && emitter . doneEmitting () ;
237292 }
238293
239294 @ Override
240295 public void close () {
241- if (output != null ) {
242- output .releaseBlocks ();
243- }
244- Releasables .close (blockHash , () -> Releasables .close (aggregators ));
296+ Releasables .close (emitter , blockHashClosed ? null : blockHash , () -> Releasables .close (aggregators ));
245297 }
246298
247299 @ Override
248300 public Operator .Status status () {
249- return new Status (hashNanos , aggregationNanos , pagesProcessed );
301+ return new Status (hashNanos , aggregationNanos , pagesProcessed , pagesEmitted );
250302 }
251303
252304 protected static void checkState (boolean condition , String msg ) {
@@ -285,33 +337,43 @@ public static class Status implements Operator.Status {
285337 */
286338 private final long aggregationNanos ;
287339 /**
288- * Count of pages this operator has processed.
340+ * Count of input pages this operator has processed.
289341 */
290342 private final int pagesProcessed ;
291343
344+ /**
345+ * Count of output pages this operator has emitted
346+ */
347+ private final int pageEmitted ;
348+
292349 /**
293350 * Build.
294351 * @param hashNanos Nanoseconds this operator has spent hashing grouping keys.
295352 * @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
296353 * @param pagesProcessed Count of pages this operator has processed.
297354 */
298- public Status (long hashNanos , long aggregationNanos , int pagesProcessed ) {
355+ public Status (long hashNanos , long aggregationNanos , int pagesProcessed , int pagesEmitted ) {
299356 this .hashNanos = hashNanos ;
300357 this .aggregationNanos = aggregationNanos ;
301358 this .pagesProcessed = pagesProcessed ;
359+ this .pageEmitted = pagesEmitted ;
302360 }
303361
304362 protected Status (StreamInput in ) throws IOException {
305363 hashNanos = in .readVLong ();
306364 aggregationNanos = in .readVLong ();
307365 pagesProcessed = in .readVInt ();
366+ pageEmitted = in .getTransportVersion ().onOrAfter (TransportVersions .ESQL_CHUNK_AGGREGATION_OUTPUT ) ? in .readVInt () : 0 ;
308367 }
309368
310369 @ Override
311370 public void writeTo (StreamOutput out ) throws IOException {
312371 out .writeVLong (hashNanos );
313372 out .writeVLong (aggregationNanos );
314373 out .writeVInt (pagesProcessed );
374+ if (out .getTransportVersion ().onOrAfter (TransportVersions .ESQL_CHUNK_AGGREGATION_OUTPUT )) {
375+ out .writeVInt (pageEmitted );
376+ }
315377 }
316378
317379 @ Override
@@ -334,12 +396,19 @@ public long aggregationNanos() {
334396 }
335397
336398 /**
337- * Count of pages this operator has processed.
399+ * Count of input pages this operator has processed.
338400 */
339401 public int pagesProcessed () {
340402 return pagesProcessed ;
341403 }
342404
405+ /**
406+ * Count of output pages this operator has emitted
407+ */
408+ public int pagesEmitted () {
409+ return pageEmitted ;
410+ }
411+
343412 @ Override
344413 public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
345414 builder .startObject ();
@@ -352,6 +421,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
352421 builder .field ("aggregation_time" , TimeValue .timeValueNanos (aggregationNanos ));
353422 }
354423 builder .field ("pages_processed" , pagesProcessed );
424+ builder .field ("pages_emitted" , pageEmitted );
355425 return builder .endObject ();
356426
357427 }
@@ -361,12 +431,15 @@ public boolean equals(Object o) {
361431 if (this == o ) return true ;
362432 if (o == null || getClass () != o .getClass ()) return false ;
363433 Status status = (Status ) o ;
364- return hashNanos == status .hashNanos && aggregationNanos == status .aggregationNanos && pagesProcessed == status .pagesProcessed ;
434+ return hashNanos == status .hashNanos
435+ && aggregationNanos == status .aggregationNanos
436+ && pagesProcessed == status .pagesProcessed
437+ && pageEmitted == status .pageEmitted ;
365438 }
366439
367440 @ Override
368441 public int hashCode () {
369- return Objects .hash (hashNanos , aggregationNanos , pagesProcessed );
442+ return Objects .hash (hashNanos , aggregationNanos , pagesProcessed , pageEmitted );
370443 }
371444
372445 @ Override
0 commit comments