Skip to content

Commit b633fe1

Browse files
authored
Replace TopBucketBuilder with a BucketPriorityQueue (#112602)
BucketPriorityQueue is a much better option nowadays
1 parent f367d27 commit b633fe1

File tree

6 files changed

+30
-389
lines changed

6 files changed

+30
-389
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
133133
* The comparator might need to reduce the {@link DelayedBucket} and therefore we need to provide the
134134
* reducer and the reduce context.The context must be on the final reduce phase.
135135
*/
136-
abstract <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
136+
public abstract <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
137137
BiFunction<List<B>, AggregationReduceContext, B> reduce,
138138
AggregationReduceContext reduceContext
139139
);

server/src/main/java/org/elasticsearch/search/aggregations/DelayedBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public String toString() {
9292
* Called to mark a bucket as non-competitive so it can release it can release
9393
* any sub-buckets from the breaker.
9494
*/
95-
void nonCompetitive(AggregationReduceContext reduceContext) {
95+
public void nonCompetitive(AggregationReduceContext reduceContext) {
9696
if (reduced != null) {
9797
// -1 for itself, -countInnerBucket for all the sub-buckets.
9898
reduceContext.consumeBucketsAndMaybeBreak(-1 - InternalMultiBucketAggregation.countInnerBucket(reduced));

server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public Comparator<Bucket> comparator() {
8181
}
8282

8383
@Override
84-
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
84+
public <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
8585
BiFunction<List<B>, AggregationReduceContext, B> reduce,
8686
AggregationReduceContext reduceContext
8787
) {
@@ -216,7 +216,7 @@ public Comparator<Bucket> comparator() {
216216
}
217217

218218
@Override
219-
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
219+
public <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
220220
BiFunction<List<B>, AggregationReduceContext, B> reduce,
221221
AggregationReduceContext reduceContext
222222
) {
@@ -284,7 +284,7 @@ public Comparator<Bucket> comparator() {
284284
}
285285

286286
@Override
287-
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
287+
public <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
288288
BiFunction<List<B>, AggregationReduceContext, B> reduce,
289289
AggregationReduceContext reduceContext
290290
) {

server/src/main/java/org/elasticsearch/search/aggregations/TopBucketBuilder.java

Lines changed: 0 additions & 210 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
2121
import org.elasticsearch.search.aggregations.InternalOrder;
2222
import org.elasticsearch.search.aggregations.KeyComparable;
23-
import org.elasticsearch.search.aggregations.TopBucketBuilder;
2423
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
2524
import org.elasticsearch.search.aggregations.support.SamplingContext;
2625
import org.elasticsearch.xcontent.XContentBuilder;
2726

2827
import java.io.IOException;
2928
import java.util.ArrayList;
29+
import java.util.Collections;
3030
import java.util.Comparator;
3131
import java.util.HashMap;
3232
import java.util.List;
@@ -295,19 +295,34 @@ public InternalAggregation get() {
295295
}
296296
});
297297
} else if (reduceContext.isFinalReduce()) {
298-
TopBucketBuilder<B> top = TopBucketBuilder.build(
299-
getRequiredSize(),
300-
getOrder(),
301-
removed -> otherDocCount[0] += removed.getDocCount(),
298+
final Comparator<DelayedBucket<B>> comparator = getOrder().delayedBucketComparator(
302299
AbstractInternalTerms.this::reduceBucket,
303300
reduceContext
304301
);
305-
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
306-
if (bucket.getDocCount() >= getMinDocCount()) {
307-
top.add(bucket);
302+
try (
303+
BucketPriorityQueue<DelayedBucket<B>> top = new BucketPriorityQueue<>(
304+
getRequiredSize(),
305+
reduceContext.bigArrays(),
306+
comparator
307+
)
308+
) {
309+
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
310+
if (bucket.getDocCount() >= getMinDocCount()) {
311+
final DelayedBucket<B> removed = top.insertWithOverflow(bucket);
312+
if (removed != null) {
313+
otherDocCount[0] += removed.getDocCount();
314+
removed.nonCompetitive(reduceContext);
315+
}
316+
}
317+
});
318+
// size is an integer as it should be <= getRequiredSize()
319+
final int size = (int) top.size();
320+
result = new ArrayList<>(size);
321+
for (int i = 0; i < size; i++) {
322+
result.add(top.pop().reduced(AbstractInternalTerms.this::reduceBucket, reduceContext));
308323
}
309-
});
310-
result = top.build();
324+
Collections.reverse(result);
325+
}
311326
} else {
312327
result = new ArrayList<>();
313328
thisReduceOrder = reduceBuckets(

0 commit comments

Comments
 (0)