Skip to content

Commit 73ee949

Browse files
Inline aggregation logic into QueryPhaseResultConsumer
This refactoring removes some needless indirection but more importantly, it sets up a significant improvement to aggs: passing them to the reducer as they become available and without retaining them in a list (serialized or materialized) any longer than necessary. I chose to do this step first to make that change shorter and more focussed as it will involve a bit of non-trivial concurrency.
1 parent 3c29593 commit 73ee949

File tree

2 files changed

+52
-53
lines changed

2 files changed

+52
-53
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,12 @@
2424
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
2525
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2626
import org.elasticsearch.core.Nullable;
27-
import org.elasticsearch.core.Releasable;
28-
import org.elasticsearch.core.Releasables;
2927
import org.elasticsearch.core.Tuple;
3028
import org.elasticsearch.search.SearchPhaseResult;
3129
import org.elasticsearch.search.SearchService;
3230
import org.elasticsearch.search.SearchShardTarget;
3331
import org.elasticsearch.search.aggregations.AggregationReduceContext;
32+
import org.elasticsearch.search.aggregations.AggregatorsReducer;
3433
import org.elasticsearch.search.aggregations.InternalAggregations;
3534
import org.elasticsearch.search.builder.SearchSourceBuilder;
3635
import org.elasticsearch.search.query.QuerySearchResult;
@@ -246,7 +245,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
246245
if (aggsList != null) {
247246
// Add an estimate of the final reduce size
248247
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
249-
aggs = aggregate(buffer.iterator(), new Iterator<>() {
248+
var reduceContext = performFinalReduce
249+
? aggReduceContextBuilder.forFinalReduction()
250+
: aggReduceContextBuilder.forPartialReduction();
251+
aggs = InternalAggregations.maybeExecuteFinalReduce(reduceContext, aggregate(buffer.iterator(), new Iterator<>() {
250252
@Override
251253
public boolean hasNext() {
252254
return aggsList.isEmpty() == false;
@@ -256,10 +258,7 @@ public boolean hasNext() {
256258
public InternalAggregations next() {
257259
return aggsList.pollFirst();
258260
}
259-
},
260-
resultSize,
261-
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
262-
);
261+
}, resultSize, reduceContext));
263262
} else {
264263
aggs = null;
265264
}
@@ -391,39 +390,59 @@ private static InternalAggregations aggregate(
391390
int resultSetSize,
392391
AggregationReduceContext reduceContext
393392
) {
394-
interface ReleasableIterator extends Iterator<InternalAggregations>, Releasable {}
395-
try (var aggsIter = new ReleasableIterator() {
396-
397-
private Releasable toRelease;
398-
399-
@Override
400-
public void close() {
401-
Releasables.close(toRelease);
393+
if (resultSetSize == 1) {
394+
if (partialResults.hasNext()) {
395+
return InternalAggregations.reduce(partialResults.next(), reduceContext);
402396
}
403-
404-
@Override
405-
public boolean hasNext() {
406-
return toConsume.hasNext();
397+
try (var delayable = toConsume.next().consumeAggs()) {
398+
return InternalAggregations.reduce(delayable.expand(), reduceContext);
407399
}
408-
409-
@Override
410-
public InternalAggregations next() {
411-
var res = toConsume.next().consumeAggs();
412-
Releasables.close(toRelease);
413-
toRelease = res;
414-
return res.expand();
400+
}
401+
try {
402+
// general case
403+
if (partialResults.hasNext()) {
404+
return consumeAggResults(partialResults, toConsume, createReducer(resultSetSize, reduceContext, partialResults.next()));
415405
}
416-
}) {
417-
return InternalAggregations.topLevelReduce(
418-
partialResults.hasNext() ? Iterators.concat(partialResults, aggsIter) : aggsIter,
419-
resultSetSize,
420-
reduceContext
421-
);
406+
AggregatorsReducer reducer;
407+
try (var delayable = toConsume.next().consumeAggs()) {
408+
reducer = createReducer(resultSetSize, reduceContext, delayable.expand());
409+
}
410+
return consumeAggResults(partialResults, toConsume, reducer);
422411
} finally {
423412
toConsume.forEachRemaining(QuerySearchResult::releaseAggs);
424413
}
425414
}
426415

416+
private static AggregatorsReducer createReducer(int resultSetSize, AggregationReduceContext reduceContext, InternalAggregations first) {
417+
boolean success = false;
418+
var reducer = new AggregatorsReducer(first, reduceContext, resultSetSize);
419+
try {
420+
reducer.accept(first);
421+
success = true;
422+
return reducer;
423+
} finally {
424+
if (success == false) {
425+
reducer.close();
426+
}
427+
}
428+
}
429+
430+
private static InternalAggregations consumeAggResults(
431+
Iterator<InternalAggregations> partialResults,
432+
Iterator<QuerySearchResult> toConsume,
433+
AggregatorsReducer reducer
434+
) {
435+
try (reducer) {
436+
partialResults.forEachRemaining(reducer::accept);
437+
while (toConsume.hasNext()) {
438+
try (var delayable = toConsume.next().consumeAggs()) {
439+
reducer.accept(delayable.expand());
440+
}
441+
}
442+
return reducer.get();
443+
}
444+
}
445+
427446
public int getNumReducePhases() {
428447
return numReducePhases;
429448
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -177,17 +177,7 @@ public SortValue sortValue(AggregationPath.PathElement head, Iterator<Aggregatio
177177
return aggregation.sortValue(Optional.ofNullable(head.key()).orElse(head.metric()));
178178
}
179179

180-
/**
181-
* Equivalent to {@link #topLevelReduce(List, AggregationReduceContext)} but it takes an iterator and a count.
182-
*/
183-
public static InternalAggregations topLevelReduce(Iterator<InternalAggregations> aggs, int count, AggregationReduceContext context) {
184-
if (count == 0) {
185-
return null;
186-
}
187-
return maybeExecuteFinalReduce(context, count == 1 ? reduce(aggs.next(), context) : reduce(aggs, count, context));
188-
}
189-
190-
private static InternalAggregations maybeExecuteFinalReduce(AggregationReduceContext context, InternalAggregations reduced) {
180+
public static InternalAggregations maybeExecuteFinalReduce(AggregationReduceContext context, InternalAggregations reduced) {
191181
if (reduced == null) {
192182
return null;
193183
}
@@ -242,16 +232,6 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
242232
}
243233
}
244234

245-
private static InternalAggregations reduce(Iterator<InternalAggregations> aggsIterator, int count, AggregationReduceContext context) {
246-
// general case
247-
var first = aggsIterator.next();
248-
try (AggregatorsReducer reducer = new AggregatorsReducer(first, context, count)) {
249-
reducer.accept(first);
250-
aggsIterator.forEachRemaining(reducer::accept);
251-
return reducer.get();
252-
}
253-
}
254-
255235
public static InternalAggregations reduce(InternalAggregations aggregations, AggregationReduceContext context) {
256236
final List<InternalAggregation> internalAggregations = aggregations.asList();
257237
int size = internalAggregations.size();

0 commit comments

Comments
 (0)