Skip to content

Commit ea73815

Browse files
authored
Remove reduce and reduceContext from DelayedBucket (#112547)
Makes DelayedBucket leaner.
1 parent 73e7b73 commit ea73815

File tree

8 files changed

+133
-77
lines changed

8 files changed

+133
-77
lines changed

docs/changelog/112547.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 112547
2+
summary: Remove reduce and `reduceContext` from `DelayedBucket`
3+
area: Aggregations
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Arrays;
1919
import java.util.Comparator;
2020
import java.util.List;
21+
import java.util.function.BiFunction;
2122
import java.util.function.ToLongFunction;
2223

2324
/**
@@ -128,8 +129,14 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
128129

129130
/**
130131
* Build a comparator for {@link DelayedBucket}, a wrapper that delays bucket reduction.
132+
*
133+
* The comparator might need to reduce the {@link DelayedBucket} and therefore we need to provide the
134+
* reducer and the reduce context.The context must be on the final reduce phase.
131135
*/
132-
abstract Comparator<DelayedBucket<? extends Bucket>> delayedBucketComparator();
136+
abstract <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
137+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
138+
AggregationReduceContext reduceContext
139+
);
133140

134141
/**
135142
* @return unique internal ID used for reading/writing this order from/to a stream.

server/src/main/java/org/elasticsearch/search/aggregations/DelayedBucket.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* as long as possible. It's stateful and not even close to thread safe.
1717
*/
1818
public final class DelayedBucket<B extends InternalMultiBucketAggregation.InternalBucket> {
19-
private final BiFunction<List<B>, AggregationReduceContext, B> reduce;
20-
private final AggregationReduceContext reduceContext;
2119
/**
2220
* The buckets to reduce or {@code null} if we've already reduced the buckets.
2321
*/
@@ -36,21 +34,15 @@ public final class DelayedBucket<B extends InternalMultiBucketAggregation.Intern
3634
/**
3735
* Build a delayed bucket.
3836
*/
39-
public DelayedBucket(
40-
BiFunction<List<B>, AggregationReduceContext, B> reduce,
41-
AggregationReduceContext reduceContext,
42-
List<B> toReduce
43-
) {
44-
this.reduce = reduce;
45-
this.reduceContext = reduceContext;
37+
public DelayedBucket(List<B> toReduce) {
4638
this.toReduce = toReduce;
4739
}
4840

4941
/**
5042
* The reduced bucket. If the bucket hasn't been reduced already this
5143
* will reduce the sub-aggs and throw out the list to reduce.
5244
*/
53-
public B reduced() {
45+
public B reduced(BiFunction<List<B>, AggregationReduceContext, B> reduce, AggregationReduceContext reduceContext) {
5446
if (reduced == null) {
5547
reduceContext.consumeBucketsAndMaybeBreak(1);
5648
reduced = reduce.apply(toReduce, reduceContext);
@@ -100,7 +92,7 @@ public String toString() {
10092
* Called to mark a bucket as non-competitive so it can release it can release
10193
* any sub-buckets from the breaker.
10294
*/
103-
void nonCompetitive() {
95+
void nonCompetitive(AggregationReduceContext reduceContext) {
10496
if (reduced != null) {
10597
// -1 for itself, -countInnerBucket for all the sub-buckets.
10698
reduceContext.consumeBucketsAndMaybeBreak(-1 - InternalMultiBucketAggregation.countInnerBucket(reduced));

server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.LinkedList;
2929
import java.util.List;
3030
import java.util.Objects;
31+
import java.util.function.BiFunction;
3132
import java.util.function.ToLongFunction;
3233

3334
/**
@@ -80,14 +81,17 @@ public Comparator<Bucket> comparator() {
8081
}
8182

8283
@Override
83-
Comparator<DelayedBucket<? extends Bucket>> delayedBucketComparator() {
84+
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
85+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
86+
AggregationReduceContext reduceContext
87+
) {
8488
Comparator<Bucket> comparator = comparator();
8589
/*
8690
* Reduce the buckets if we haven't already so we can get at the
8791
* sub-aggregations. With enough code we could avoid this but
8892
* we haven't written that code....
8993
*/
90-
return (lhs, rhs) -> comparator.compare(lhs.reduced(), rhs.reduced());
94+
return (lhs, rhs) -> comparator.compare(lhs.reduced(reduce, reduceContext), rhs.reduced(reduce, reduceContext));
9195
}
9296

9397
@Override
@@ -212,12 +216,15 @@ public Comparator<Bucket> comparator() {
212216
}
213217

214218
@Override
215-
Comparator<DelayedBucket<? extends Bucket>> delayedBucketComparator() {
216-
List<Comparator<DelayedBucket<? extends Bucket>>> comparators = orderElements.stream()
217-
.map(BucketOrder::delayedBucketComparator)
219+
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
220+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
221+
AggregationReduceContext reduceContext
222+
) {
223+
List<Comparator<DelayedBucket<B>>> comparators = orderElements.stream()
224+
.map(b -> b.delayedBucketComparator(reduce, reduceContext))
218225
.toList();
219226
return (lhs, rhs) -> {
220-
for (Comparator<DelayedBucket<? extends Bucket>> c : comparators) {
227+
for (Comparator<DelayedBucket<B>> c : comparators) {
221228
int result = c.compare(lhs, rhs);
222229
if (result != 0) {
223230
return result;
@@ -277,8 +284,11 @@ public Comparator<Bucket> comparator() {
277284
}
278285

279286
@Override
280-
Comparator<DelayedBucket<? extends Bucket>> delayedBucketComparator() {
281-
return delayedBucketCompator;
287+
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
288+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
289+
AggregationReduceContext reduceContext
290+
) {
291+
return delayedBucketCompator::compare;
282292
}
283293

284294
@Override

server/src/main/java/org/elasticsearch/search/aggregations/TopBucketBuilder.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010

1111
import org.apache.lucene.util.ArrayUtil;
1212
import org.apache.lucene.util.PriorityQueue;
13-
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
1413

1514
import java.util.ArrayList;
1615
import java.util.Collections;
1716
import java.util.Comparator;
1817
import java.util.List;
18+
import java.util.function.BiFunction;
1919
import java.util.function.Consumer;
2020

2121
/**
@@ -53,16 +53,20 @@ public abstract class TopBucketBuilder<B extends InternalMultiBucketAggregation.
5353
* @param size the requested size of the list
5454
* @param order the sort order of the buckets
5555
* @param nonCompetitive called with non-competitive buckets
56+
* @param reduce function to reduce a list of buckets
57+
* @param reduceContext the reduce context
5658
*/
5759
public static <B extends InternalMultiBucketAggregation.InternalBucket> TopBucketBuilder<B> build(
5860
int size,
5961
BucketOrder order,
60-
Consumer<DelayedBucket<B>> nonCompetitive
62+
Consumer<DelayedBucket<B>> nonCompetitive,
63+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
64+
AggregationReduceContext reduceContext
6165
) {
6266
if (size < USE_BUFFERING_BUILDER) {
63-
return new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive);
67+
return new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext);
6468
}
65-
return new BufferingTopBucketBuilder<>(size, order, nonCompetitive);
69+
return new BufferingTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext);
6670
}
6771

6872
protected final Consumer<DelayedBucket<B>> nonCompetitive;
@@ -96,14 +100,24 @@ private TopBucketBuilder(Consumer<DelayedBucket<B>> nonCompetitive) {
96100
*/
97101
static class PriorityQueueTopBucketBuilder<B extends InternalMultiBucketAggregation.InternalBucket> extends TopBucketBuilder<B> {
98102
private final PriorityQueue<DelayedBucket<B>> queue;
99-
100-
PriorityQueueTopBucketBuilder(int size, BucketOrder order, Consumer<DelayedBucket<B>> nonCompetitive) {
103+
private final BiFunction<List<B>, AggregationReduceContext, B> reduce;
104+
private final AggregationReduceContext reduceContext;
105+
106+
PriorityQueueTopBucketBuilder(
107+
int size,
108+
BucketOrder order,
109+
Consumer<DelayedBucket<B>> nonCompetitive,
110+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
111+
AggregationReduceContext reduceContext
112+
) {
101113
super(nonCompetitive);
102114
if (size >= ArrayUtil.MAX_ARRAY_LENGTH) {
103115
throw new IllegalArgumentException("can't reduce more than [" + ArrayUtil.MAX_ARRAY_LENGTH + "] buckets");
104116
}
117+
this.reduce = reduce;
118+
this.reduceContext = reduceContext;
105119
queue = new PriorityQueue<>(size) {
106-
private final Comparator<DelayedBucket<? extends Bucket>> comparator = order.delayedBucketComparator();
120+
private final Comparator<DelayedBucket<B>> comparator = order.delayedBucketComparator(reduce, reduceContext);
107121

108122
@Override
109123
protected boolean lessThan(DelayedBucket<B> a, DelayedBucket<B> b) {
@@ -117,15 +131,15 @@ public void add(DelayedBucket<B> bucket) {
117131
DelayedBucket<B> removed = queue.insertWithOverflow(bucket);
118132
if (removed != null) {
119133
nonCompetitive.accept(removed);
120-
removed.nonCompetitive();
134+
removed.nonCompetitive(reduceContext);
121135
}
122136
}
123137

124138
@Override
125139
public List<B> build() {
126140
List<B> result = new ArrayList<>(queue.size());
127141
for (int i = queue.size() - 1; i >= 0; i--) {
128-
result.add(queue.pop().reduced());
142+
result.add(queue.pop().reduced(reduce, reduceContext));
129143
}
130144
Collections.reverse(result);
131145
return result;
@@ -140,12 +154,22 @@ public List<B> build() {
140154
private static class BufferingTopBucketBuilder<B extends InternalMultiBucketAggregation.InternalBucket> extends TopBucketBuilder<B> {
141155
private final int size;
142156
private final BucketOrder order;
157+
private final BiFunction<List<B>, AggregationReduceContext, B> reduce;
158+
private final AggregationReduceContext reduceContext;
143159

144160
private List<DelayedBucket<B>> buffer;
145161
private PriorityQueueTopBucketBuilder<B> next;
146162

147-
BufferingTopBucketBuilder(int size, BucketOrder order, Consumer<DelayedBucket<B>> nonCompetitive) {
163+
BufferingTopBucketBuilder(
164+
int size,
165+
BucketOrder order,
166+
Consumer<DelayedBucket<B>> nonCompetitive,
167+
BiFunction<List<B>, AggregationReduceContext, B> reduce,
168+
AggregationReduceContext reduceContext
169+
) {
148170
super(nonCompetitive);
171+
this.reduce = reduce;
172+
this.reduceContext = reduceContext;
149173
this.size = size;
150174
this.order = order;
151175
buffer = new ArrayList<>();
@@ -162,7 +186,7 @@ public void add(DelayedBucket<B> bucket) {
162186
if (buffer.size() < size) {
163187
return;
164188
}
165-
next = new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive);
189+
next = new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext);
166190
for (DelayedBucket<B> b : buffer) {
167191
next.queue.add(b);
168192
}
@@ -177,7 +201,7 @@ public List<B> build() {
177201
}
178202
List<B> result = new ArrayList<>(buffer.size());
179203
for (DelayedBucket<B> b : buffer) {
180-
result.add(b.reduced());
204+
result.add(b.reduced(reduce, reduceContext));
181205
}
182206
result.sort(order.comparator());
183207
return result;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -137,28 +137,18 @@ private long getDocCountError(A terms) {
137137
* @param sink Handle the reduced buckets. Returns false if we should stop iterating the buckets, true if we should continue.
138138
* @return the order we used to reduce the buckets
139139
*/
140-
private BucketOrder reduceBuckets(
141-
List<List<B>> bucketsList,
142-
BucketOrder thisReduceOrder,
143-
AggregationReduceContext reduceContext,
144-
Consumer<DelayedBucket<B>> sink
145-
) {
140+
private BucketOrder reduceBuckets(List<List<B>> bucketsList, BucketOrder thisReduceOrder, Consumer<DelayedBucket<B>> sink) {
146141
if (isKeyOrder(thisReduceOrder)) {
147142
// extract the primary sort in case this is a compound order.
148143
thisReduceOrder = InternalOrder.key(isKeyAsc(thisReduceOrder));
149-
reduceMergeSort(bucketsList, thisReduceOrder, reduceContext, sink);
144+
reduceMergeSort(bucketsList, thisReduceOrder, sink);
150145
} else {
151-
reduceLegacy(bucketsList, reduceContext, sink);
146+
reduceLegacy(bucketsList, sink);
152147
}
153148
return thisReduceOrder;
154149
}
155150

156-
private void reduceMergeSort(
157-
List<List<B>> bucketsList,
158-
BucketOrder thisReduceOrder,
159-
AggregationReduceContext reduceContext,
160-
Consumer<DelayedBucket<B>> sink
161-
) {
151+
private void reduceMergeSort(List<List<B>> bucketsList, BucketOrder thisReduceOrder, Consumer<DelayedBucket<B>> sink) {
162152
assert isKeyOrder(thisReduceOrder);
163153
final Comparator<Bucket> cmp = thisReduceOrder.comparator();
164154
final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<>(bucketsList.size()) {
@@ -179,7 +169,7 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
179169
if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
180170
// the key changed so bundle up the last key's worth of buckets
181171
sameTermBuckets.trimToSize();
182-
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
172+
sink.accept(new DelayedBucket<>(sameTermBuckets));
183173
sameTermBuckets = new ArrayList<>();
184174
}
185175
lastBucket = top.current();
@@ -200,11 +190,11 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
200190

201191
if (sameTermBuckets.isEmpty() == false) {
202192
sameTermBuckets.trimToSize();
203-
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
193+
sink.accept(new DelayedBucket<>(sameTermBuckets));
204194
}
205195
}
206196

207-
private void reduceLegacy(List<List<B>> bucketsList, AggregationReduceContext reduceContext, Consumer<DelayedBucket<B>> sink) {
197+
private void reduceLegacy(List<List<B>> bucketsList, Consumer<DelayedBucket<B>> sink) {
208198
final Map<Object, ArrayList<B>> bucketMap = new HashMap<>();
209199
for (List<B> buckets : bucketsList) {
210200
for (B bucket : buckets) {
@@ -213,7 +203,7 @@ private void reduceLegacy(List<List<B>> bucketsList, AggregationReduceContext re
213203
}
214204
for (ArrayList<B> sameTermBuckets : bucketMap.values()) {
215205
sameTermBuckets.trimToSize();
216-
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
206+
sink.accept(new DelayedBucket<>(sameTermBuckets));
217207
}
218208
}
219209

@@ -297,9 +287,9 @@ public InternalAggregation get() {
297287
* so we can just have an optimize collection.
298288
*/
299289
result = new ArrayList<>();
300-
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), reduceContext, bucket -> {
290+
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
301291
if (result.size() < getRequiredSize()) {
302-
result.add(bucket.reduced());
292+
result.add(bucket.reduced(AbstractInternalTerms.this::reduceBucket, reduceContext));
303293
} else {
304294
otherDocCount[0] += bucket.getDocCount();
305295
}
@@ -308,17 +298,23 @@ public InternalAggregation get() {
308298
TopBucketBuilder<B> top = TopBucketBuilder.build(
309299
getRequiredSize(),
310300
getOrder(),
311-
removed -> otherDocCount[0] += removed.getDocCount()
301+
removed -> otherDocCount[0] += removed.getDocCount(),
302+
AbstractInternalTerms.this::reduceBucket,
303+
reduceContext
312304
);
313-
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), reduceContext, bucket -> {
305+
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
314306
if (bucket.getDocCount() >= getMinDocCount()) {
315307
top.add(bucket);
316308
}
317309
});
318310
result = top.build();
319311
} else {
320312
result = new ArrayList<>();
321-
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), reduceContext, bucket -> result.add(bucket.reduced()));
313+
thisReduceOrder = reduceBuckets(
314+
bucketsList,
315+
getThisReduceOrder(),
316+
bucket -> result.add(bucket.reduced(AbstractInternalTerms.this::reduceBucket, reduceContext))
317+
);
322318
}
323319
for (B r : result) {
324320
if (sumDocCountError == -1) {

0 commit comments

Comments
 (0)