1515import org .elasticsearch .action .search .SearchPhaseController .TopDocsStats ;
1616import org .elasticsearch .common .breaker .CircuitBreaker ;
1717import org .elasticsearch .common .breaker .CircuitBreakingException ;
18+ import org .elasticsearch .common .collect .Iterators ;
1819import org .elasticsearch .common .io .stream .DelayableWriteable ;
1920import org .elasticsearch .common .lucene .search .TopDocsAndMaxScore ;
2021import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
22+ import org .elasticsearch .core .Releasable ;
23+ import org .elasticsearch .core .Releasables ;
2124import org .elasticsearch .search .SearchPhaseResult ;
2225import org .elasticsearch .search .SearchService ;
2326import org .elasticsearch .search .SearchShardTarget ;
3134import java .util .ArrayList ;
3235import java .util .Collections ;
3336import java .util .Comparator ;
37+ import java .util .Iterator ;
3438import java .util .List ;
3539import java .util .concurrent .Executor ;
3640import java .util .concurrent .atomic .AtomicReference ;
@@ -174,14 +178,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
174178 this .mergeResult = null ;
175179 final int resultSize = buffer .size () + (mergeResult == null ? 0 : 1 );
176180 final List <TopDocs > topDocsList = hasTopDocs ? new ArrayList <>(resultSize ) : null ;
177- final List <DelayableWriteable <InternalAggregations >> aggsList = hasAggs ? new ArrayList <>(resultSize ) : null ;
178181 if (mergeResult != null ) {
179182 if (topDocsList != null ) {
180183 topDocsList .add (mergeResult .reducedTopDocs );
181184 }
182- if (aggsList != null ) {
183- aggsList .add (DelayableWriteable .referencing (mergeResult .reducedAggs ));
184- }
185185 }
186186 for (QuerySearchResult result : buffer ) {
187187 topDocsStats .add (result .topDocs (), result .searchTimedOut (), result .terminatedEarly ());
@@ -190,34 +190,39 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
190190 setShardIndex (topDocs .topDocs , result .getShardIndex ());
191191 topDocsList .add (topDocs .topDocs );
192192 }
193- if (aggsList != null ) {
194- aggsList .add (result .getAggs ());
195- }
196193 }
197194 SearchPhaseController .ReducedQueryPhase reducePhase ;
198195 long breakerSize = circuitBreakerBytes ;
196+ final InternalAggregations aggs ;
199197 try {
200- if (aggsList != null ) {
198+ if (hasAggs ) {
201199 // Add an estimate of the final reduce size
202200 breakerSize = addEstimateAndMaybeBreak (estimateRamBytesUsedForReduce (breakerSize ));
201+ aggs = aggregate (
202+ buffer .iterator (),
203+ mergeResult ,
204+ resultSize ,
205+ performFinalReduce ? aggReduceContextBuilder .forFinalReduction () : aggReduceContextBuilder .forPartialReduction ()
206+ );
207+ } else {
208+ aggs = null ;
203209 }
204210 reducePhase = SearchPhaseController .reducedQueryPhase (
205211 results .asList (),
206- aggsList ,
212+ aggs ,
207213 topDocsList == null ? Collections .emptyList () : topDocsList ,
208214 topDocsStats ,
209215 numReducePhases ,
210216 false ,
211- aggReduceContextBuilder ,
212- queryPhaseRankCoordinatorContext ,
213- performFinalReduce
217+ queryPhaseRankCoordinatorContext
214218 );
219+ buffer = null ;
215220 } finally {
216221 releaseAggs (buffer );
217222 }
218223 if (hasAggs
219224 // reduced aggregations can be null if all shards failed
220- && reducePhase . aggregations () != null ) {
225+ && aggs != null ) {
221226
222227 // Update the circuit breaker to replace the estimation with the serialized size of the newly reduced result
223228 long finalSize = DelayableWriteable .getSerializedSize (reducePhase .aggregations ()) - breakerSize ;
@@ -249,17 +254,7 @@ private MergeResult partialReduce(
249254 toConsume .sort (RESULT_COMPARATOR );
250255
251256 final TopDocs newTopDocs ;
252- final InternalAggregations newAggs ;
253- final List <DelayableWriteable <InternalAggregations >> aggsList ;
254257 final int resultSetSize = toConsume .size () + (lastMerge != null ? 1 : 0 );
255- if (hasAggs ) {
256- aggsList = new ArrayList <>(resultSetSize );
257- if (lastMerge != null ) {
258- aggsList .add (DelayableWriteable .referencing (lastMerge .reducedAggs ));
259- }
260- } else {
261- aggsList = null ;
262- }
263258 List <TopDocs > topDocsList ;
264259 if (hasTopDocs ) {
265260 topDocsList = new ArrayList <>(resultSetSize );
@@ -269,14 +264,12 @@ private MergeResult partialReduce(
269264 } else {
270265 topDocsList = null ;
271266 }
267+ final InternalAggregations newAggs ;
272268 try {
273269 for (QuerySearchResult result : toConsume ) {
274270 topDocsStats .add (result .topDocs (), result .searchTimedOut (), result .terminatedEarly ());
275271 SearchShardTarget target = result .getSearchShardTarget ();
276272 processedShards .add (new SearchShard (target .getClusterAlias (), target .getShardId ()));
277- if (aggsList != null ) {
278- aggsList .add (result .getAggs ());
279- }
280273 if (topDocsList != null ) {
281274 TopDocsAndMaxScore topDocs = result .consumeTopDocs ();
282275 setShardIndex (topDocs .topDocs , result .getShardIndex ());
@@ -285,9 +278,10 @@ private MergeResult partialReduce(
285278 }
286279 // we have to merge here in the same way we collect on a shard
287280 newTopDocs = topDocsList == null ? null : mergeTopDocs (topDocsList , topNSize , 0 );
288- newAggs = aggsList == null
289- ? null
290- : InternalAggregations .topLevelReduceDelayable (aggsList , aggReduceContextBuilder .forPartialReduction ());
281+ newAggs = hasAggs
282+ ? aggregate (toConsume .iterator (), lastMerge , resultSetSize , aggReduceContextBuilder .forPartialReduction ())
283+ : null ;
284+ toConsume = null ;
291285 } finally {
292286 releaseAggs (toConsume );
293287 }
@@ -302,6 +296,45 @@ private MergeResult partialReduce(
302296 return new MergeResult (processedShards , newTopDocs , newAggs , newAggs != null ? DelayableWriteable .getSerializedSize (newAggs ) : 0 );
303297 }
304298
299+ private static InternalAggregations aggregate (
300+ Iterator <QuerySearchResult > toConsume ,
301+ MergeResult lastMerge ,
302+ int resultSetSize ,
303+ AggregationReduceContext reduceContext
304+ ) {
305+ interface ReleasableIterator extends Iterator <InternalAggregations >, Releasable {}
306+ try (var aggsIter = new ReleasableIterator () {
307+
308+ private Releasable toRelease ;
309+
310+ @ Override
311+ public void close () {
312+ Releasables .close (toRelease );
313+ }
314+
315+ @ Override
316+ public boolean hasNext () {
317+ return toConsume .hasNext ();
318+ }
319+
320+ @ Override
321+ public InternalAggregations next () {
322+ var res = toConsume .next ().consumeAggs ();
323+ Releasables .close (toRelease );
324+ toRelease = res ;
325+ return res .expand ();
326+ }
327+ }) {
328+ return InternalAggregations .topLevelReduce (
329+ lastMerge == null ? aggsIter : Iterators .concat (Iterators .single (lastMerge .reducedAggs ), aggsIter ),
330+ resultSetSize ,
331+ reduceContext
332+ );
333+ } finally {
334+ toConsume .forEachRemaining (QuerySearchResult ::releaseAggs );
335+ }
336+ }
337+
305338 public int getNumReducePhases () {
306339 return numReducePhases ;
307340 }
@@ -517,8 +550,10 @@ public void onFailure(Exception exc) {
517550 }
518551
519552 private static void releaseAggs (List <QuerySearchResult > toConsume ) {
520- for (QuerySearchResult result : toConsume ) {
521- result .releaseAggs ();
553+ if (toConsume != null ) {
554+ for (QuerySearchResult result : toConsume ) {
555+ result .releaseAggs ();
556+ }
522557 }
523558 }
524559
0 commit comments