2222import org .elasticsearch .compute .data .IntBlock ;
2323import org .elasticsearch .compute .data .IntVector ;
2424import org .elasticsearch .compute .data .Page ;
25+ import org .elasticsearch .core .Releasable ;
2526import org .elasticsearch .core .Releasables ;
2627import org .elasticsearch .core .TimeValue ;
2728import org .elasticsearch .index .analysis .AnalysisRegistry ;
@@ -58,12 +59,14 @@ public Operator get(DriverContext driverContext) {
5859 analysisRegistry ,
5960 maxPageSize
6061 ),
62+ maxPageSize ,
6163 driverContext
6264 );
6365 }
6466 return new HashAggregationOperator (
6567 aggregators ,
6668 () -> BlockHash .build (groups , driverContext .blockFactory (), maxPageSize , false ),
69+ maxPageSize ,
6770 driverContext
6871 );
6972 }
@@ -78,9 +81,10 @@ public String describe() {
7881 }
7982 }
8083
81- private boolean finished ;
82- private Page output ;
84+ private final int maxPageSize ;
85+ private Emitter emitter ;
8386
87+ private boolean blockHashClosed = false ;
8488 private final BlockHash blockHash ;
8589
8690 private final List <GroupingAggregator > aggregators ;
@@ -96,7 +100,7 @@ public String describe() {
96100 */
97101 private long aggregationNanos ;
98102 /**
99- * Count of pages this operator has processed.
103+ * Count of input pages this operator has processed.
100104 */
101105 private int pagesProcessed ;
102106 /**
@@ -112,8 +116,10 @@ public String describe() {
112116 public HashAggregationOperator (
113117 List <GroupingAggregator .Factory > aggregators ,
114118 Supplier <BlockHash > blockHash ,
119+ int maxPageSize ,
115120 DriverContext driverContext
116121 ) {
122+ this .maxPageSize = maxPageSize ;
117123 this .aggregators = new ArrayList <>(aggregators .size ());
118124 this .driverContext = driverContext ;
119125 boolean success = false ;
@@ -132,7 +138,7 @@ public HashAggregationOperator(
132138
133139 @ Override
134140 public boolean needsInput () {
135- return finished == false ;
141+ return emitter == null ;
136142 }
137143
138144 @ Override
@@ -201,59 +207,97 @@ public void close() {
201207
202208 @ Override
203209 public Page getOutput () {
204- Page p = output ;
205- if (p != null ) {
206- rowsEmitted += p .getPositionCount ();
210+ if (emitter == null ) {
211+ return null ;
207212 }
208- output = null ;
209- return p ;
213+ return emitter .nextPage ();
210214 }
211215
212- @ Override
213- public void finish () {
214- if (finished ) {
215- return ;
216+ private class Emitter implements Releasable {
217+ private final int [] aggBlockCounts ;
218+ private int position = -1 ;
219+ private IntVector allSelected = null ;
220+ private Block [] allKeys ;
221+
222+ Emitter (int [] aggBlockCounts ) {
223+ this .aggBlockCounts = aggBlockCounts ;
216224 }
217- finished = true ;
218- Block [] blocks = null ;
219- IntVector selected = null ;
220- boolean success = false ;
221- try {
222- selected = blockHash .nonEmpty ();
223- Block [] keys = blockHash .getKeys ();
224- int [] aggBlockCounts = aggregators .stream ().mapToInt (GroupingAggregator ::evaluateBlockCount ).toArray ();
225- blocks = new Block [keys .length + Arrays .stream (aggBlockCounts ).sum ()];
226- System .arraycopy (keys , 0 , blocks , 0 , keys .length );
227- int offset = keys .length ;
228- for (int i = 0 ; i < aggregators .size (); i ++) {
229- var aggregator = aggregators .get (i );
230- aggregator .evaluate (blocks , offset , selected , driverContext );
231- offset += aggBlockCounts [i ];
225+
226+ Page nextPage () {
227+ if (position == -1 ) {
228+ position = 0 ;
229+ // TODO: chunk selected and keys
230+ allKeys = blockHash .getKeys ();
231+ allSelected = blockHash .nonEmpty ();
232+ blockHashClosed = true ;
233+ blockHash .close ();
232234 }
233- output = new Page (blocks );
234- success = true ;
235- } finally {
236- // selected should always be closed
237- if (selected != null ) {
238- selected .close ();
235+ final int endPosition = Math .toIntExact (Math .min (position + (long ) maxPageSize , allSelected .getPositionCount ()));
236+ if (endPosition == position ) {
237+ return null ;
239238 }
240- if (success == false && blocks != null ) {
241- Releasables .closeExpectNoException (blocks );
239+ final boolean singlePage = position == 0 && endPosition == allSelected .getPositionCount ();
240+ final Block [] blocks = new Block [allKeys .length + Arrays .stream (aggBlockCounts ).sum ()];
241+ IntVector selected = null ;
242+ boolean success = false ;
243+ try {
244+ if (singlePage ) {
245+ this .allSelected .incRef ();
246+ selected = this .allSelected ;
247+ for (int i = 0 ; i < allKeys .length ; i ++) {
248+ allKeys [i ].incRef ();
249+ blocks [i ] = allKeys [i ];
250+ }
251+ } else {
252+ final int [] positions = new int [endPosition - position ];
253+ for (int i = 0 ; i < positions .length ; i ++) {
254+ positions [i ] = position + i ;
255+ }
256+ selected = allSelected .filter (positions );
257+ for (int keyIndex = 0 ; keyIndex < allKeys .length ; keyIndex ++) {
258+ blocks [keyIndex ] = allKeys [keyIndex ].filter (positions );
259+ }
260+ }
261+ int blockOffset = allKeys .length ;
262+ for (int i = 0 ; i < aggregators .size (); i ++) {
263+ aggregators .get (i ).evaluate (blocks , blockOffset , selected , driverContext );
264+ blockOffset += aggBlockCounts [i ];
265+ }
266+ var output = new Page (blocks );
267+ rowsEmitted += output .getPositionCount ();
268+ success = true ;
269+ return output ;
270+ } finally {
271+ position = endPosition ;
272+ Releasables .close (selected , success ? null : Releasables .wrap (blocks ));
242273 }
243274 }
275+
276+ @ Override
277+ public void close () {
278+ Releasables .close (allSelected , allKeys != null ? Releasables .wrap (allKeys ) : null );
279+ }
280+
281+ boolean doneEmitting () {
282+ return allSelected != null && position >= allSelected .getPositionCount ();
283+ }
284+ }
285+
286+ @ Override
287+ public void finish () {
288+ if (emitter == null ) {
289+ emitter = new Emitter (aggregators .stream ().mapToInt (GroupingAggregator ::evaluateBlockCount ).toArray ());
290+ }
244291 }
245292
246293 @ Override
247294 public boolean isFinished () {
248- return finished && output == null ;
295+ return emitter != null && emitter . doneEmitting () ;
249296 }
250297
251298 @ Override
252299 public void close () {
253- if (output != null ) {
254- output .releaseBlocks ();
255- }
256- Releasables .close (blockHash , () -> Releasables .close (aggregators ));
300+ Releasables .close (emitter , blockHashClosed ? null : blockHash , () -> Releasables .close (aggregators ));
257301 }
258302
259303 @ Override
@@ -297,7 +341,7 @@ public static class Status implements Operator.Status {
297341 */
298342 private final long aggregationNanos ;
299343 /**
300- * Count of pages this operator has processed.
344+ * Count of input pages this operator has processed.
301345 */
302346 private final int pagesProcessed ;
303347 /**
@@ -371,7 +415,7 @@ public long aggregationNanos() {
371415 }
372416
373417 /**
374- * Count of pages this operator has processed.
418+ * Count of input pages this operator has processed.
375419 */
376420 public int pagesProcessed () {
377421 return pagesProcessed ;
0 commit comments