Skip to content

Commit 88d346f

Browse files
authored
Remove bucketOrd from InternalGeoGridBucket (#117615) (#117823)
This commit removes the need of having a bucketOrd in InternalGeoGridBucket that is only used to build the InternalAggregation from the aggregator.
1 parent 2eb635b commit 88d346f

File tree

6 files changed

+112
-42
lines changed

6 files changed

+112
-42
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {}
160160
* the provided ordinals.
161161
* <p>
162162
* Most aggregations should probably use something like
163-
* {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)}
163+
* {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}
164+
* or {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)}
164165
* or {@link #buildAggregationsForVariableBuckets(LongArray, LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)}
165166
* or {@link #buildAggregationsForFixedBucketCount(LongArray, int, BucketBuilderForFixedCount, Function)}
166167
* or {@link #buildAggregationsForSingleBucket(LongArray, SingleBucketResultBuilder)}
@@ -193,10 +194,9 @@ public int size() {
193194
}
194195

195196
/**
196-
* Build the sub aggregation results for a list of buckets and set them on
197-
* the buckets. This is usually used by aggregations that are selective
198-
* in which bucket they build. They use some mechanism of selecting a list
199-
* of buckets to build use this method to "finish" building the results.
197+
* Similarly to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}
198+
* but it needs to build the bucket ordinals. This method usually requires for buckets
199+
* to contain the bucket ordinal.
200200
* @param buckets the buckets to finish building
201201
* @param bucketToOrd how to convert a bucket into an ordinal
202202
* @param setAggs how to set the sub-aggregation results on a bucket
@@ -218,12 +218,29 @@ protected final <B> void buildSubAggsForAllBuckets(
218218
bucketOrdsToCollect.set(s++, bucketToOrd.applyAsLong(bucket));
219219
}
220220
}
221-
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
222-
s = 0;
223-
for (long ord = 0; ord < buckets.size(); ord++) {
224-
for (B value : buckets.get(ord)) {
225-
setAggs.accept(value, results.apply(s++));
226-
}
221+
buildSubAggsForAllBuckets(buckets, bucketOrdsToCollect, setAggs);
222+
}
223+
}
224+
225+
/**
226+
* Build the sub aggregation results for a list of buckets and set them on
227+
* the buckets. This is usually used by aggregations that are selective
228+
* in which bucket they build. They use some mechanism of selecting a list
229+
* of buckets to build use this method to "finish" building the results.
230+
* @param buckets the buckets to finish building
231+
* @param bucketOrdsToCollect bucket ordinals
232+
* @param setAggs how to set the sub-aggregation results on a bucket
233+
*/
234+
protected final <B> void buildSubAggsForAllBuckets(
235+
ObjectArray<B[]> buckets,
236+
LongArray bucketOrdsToCollect,
237+
BiConsumer<B, InternalAggregations> setAggs
238+
) throws IOException {
239+
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
240+
int s = 0;
241+
for (long ord = 0; ord < buckets.size(); ord++) {
242+
for (B value : buckets.get(ord)) {
243+
setAggs.accept(value, results.apply(s++));
227244
}
228245
}
229246
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,24 @@
1111
import org.elasticsearch.common.util.BigArrays;
1212
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
1313

14-
class BucketPriorityQueue<B extends InternalGeoGridBucket> extends ObjectArrayPriorityQueue<B> {
14+
import java.util.function.Function;
1515

16-
BucketPriorityQueue(int size, BigArrays bigArrays) {
16+
class BucketPriorityQueue<A, B extends InternalGeoGridBucket> extends ObjectArrayPriorityQueue<A> {
17+
18+
private final Function<A, B> bucketSupplier;
19+
20+
BucketPriorityQueue(int size, BigArrays bigArrays, Function<A, B> bucketSupplier) {
1721
super(size, bigArrays);
22+
this.bucketSupplier = bucketSupplier;
1823
}
1924

2025
@Override
21-
protected boolean lessThan(InternalGeoGridBucket o1, InternalGeoGridBucket o2) {
22-
int cmp = Long.compare(o2.getDocCount(), o1.getDocCount());
26+
protected boolean lessThan(A o1, A o2) {
27+
final B b1 = bucketSupplier.apply(o1);
28+
final B b2 = bucketSupplier.apply(o2);
29+
int cmp = Long.compare(b2.getDocCount(), b1.getDocCount());
2330
if (cmp == 0) {
24-
cmp = o2.compareTo(o1);
31+
cmp = b2.compareTo(b1);
2532
if (cmp == 0) {
2633
cmp = System.identityHashCode(o2) - System.identityHashCode(o1);
2734
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.index.NumericDocValues;
1313
import org.apache.lucene.index.SortedNumericDocValues;
1414
import org.apache.lucene.search.ScoreMode;
15+
import org.elasticsearch.common.util.IntArray;
1516
import org.elasticsearch.common.util.LongArray;
1617
import org.elasticsearch.common.util.ObjectArray;
1718
import org.elasticsearch.core.Releasables;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.search.aggregations.LeafBucketCollector;
2425
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
2526
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
27+
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
2628
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
2729
import org.elasticsearch.search.aggregations.support.AggregationContext;
2830
import org.elasticsearch.search.aggregations.support.ValuesSource;
@@ -135,34 +137,52 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
135137

136138
@Override
137139
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
140+
138141
try (ObjectArray<InternalGeoGridBucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) {
139-
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
140-
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize);
141-
142-
try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, bigArrays())) {
143-
InternalGeoGridBucket spare = null;
144-
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
145-
while (ordsEnum.next()) {
146-
if (spare == null) {
147-
checkRealMemoryCBForInternalBucket();
148-
spare = newEmptyBucket();
142+
try (IntArray bucketsSizePerOrd = bigArrays().newIntArray(owningBucketOrds.size())) {
143+
long ordsToCollect = 0;
144+
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
145+
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize);
146+
ordsToCollect += size;
147+
bucketsSizePerOrd.set(ordIdx, size);
148+
}
149+
try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) {
150+
long ordsCollected = 0;
151+
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
152+
try (
153+
BucketPriorityQueue<BucketAndOrd<InternalGeoGridBucket>, InternalGeoGridBucket> ordered =
154+
new BucketPriorityQueue<>(bucketsSizePerOrd.get(ordIdx), bigArrays(), b -> b.bucket)
155+
) {
156+
BucketAndOrd<InternalGeoGridBucket> spare = null;
157+
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
158+
while (ordsEnum.next()) {
159+
if (spare == null) {
160+
checkRealMemoryCBForInternalBucket();
161+
spare = new BucketAndOrd<>(newEmptyBucket());
162+
}
163+
164+
// need a special function to keep the source bucket
165+
// up-to-date so it can get the appropriate key
166+
spare.bucket.hashAsLong = ordsEnum.value();
167+
spare.bucket.docCount = bucketDocCount(ordsEnum.ord());
168+
spare.ord = ordsEnum.ord();
169+
spare = ordered.insertWithOverflow(spare);
170+
}
171+
final int orderedSize = (int) ordered.size();
172+
final InternalGeoGridBucket[] buckets = new InternalGeoGridBucket[orderedSize];
173+
for (int i = orderedSize - 1; i >= 0; --i) {
174+
BucketAndOrd<InternalGeoGridBucket> bucketBucketAndOrd = ordered.pop();
175+
buckets[i] = bucketBucketAndOrd.bucket;
176+
ordsArray.set(ordsCollected + i, bucketBucketAndOrd.ord);
177+
}
178+
topBucketsPerOrd.set(ordIdx, buckets);
179+
ordsCollected += orderedSize;
149180
}
150-
151-
// need a special function to keep the source bucket
152-
// up-to-date so it can get the appropriate key
153-
spare.hashAsLong = ordsEnum.value();
154-
spare.docCount = bucketDocCount(ordsEnum.ord());
155-
spare.bucketOrd = ordsEnum.ord();
156-
spare = ordered.insertWithOverflow(spare);
157-
}
158-
159-
topBucketsPerOrd.set(ordIdx, new InternalGeoGridBucket[(int) ordered.size()]);
160-
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
161-
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
162181
}
182+
assert ordsCollected == ordsArray.size();
183+
buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs);
163184
}
164185
}
165-
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
166186
return buildAggregations(
167187
Math.toIntExact(owningBucketOrds.size()),
168188
ordIdx -> buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata())

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.Objects;
30+
import java.util.function.Function;
3031

3132
import static java.util.Collections.unmodifiableList;
3233

@@ -106,7 +107,13 @@ public InternalAggregation get() {
106107
final int size = Math.toIntExact(
107108
context.isFinalReduce() == false ? bucketsReducer.size() : Math.min(requiredSize, bucketsReducer.size())
108109
);
109-
try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, context.bigArrays())) {
110+
try (
111+
BucketPriorityQueue<InternalGeoGridBucket, InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(
112+
size,
113+
context.bigArrays(),
114+
Function.identity()
115+
)
116+
) {
110117
bucketsReducer.forEach(entry -> {
111118
InternalGeoGridBucket bucket = createBucket(entry.key, entry.value.getDocCount(), entry.value.getAggregations());
112119
ordered.insertWithOverflow(bucket);

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ public abstract class InternalGeoGridBucket extends InternalMultiBucketAggregati
2828
protected long docCount;
2929
protected InternalAggregations aggregations;
3030

31-
long bucketOrd;
32-
3331
public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregations aggregations) {
3432
this.docCount = docCount;
3533
this.aggregations = aggregations;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.aggregations.bucket.terms;
11+
12+
/** Represents a bucket and its bucket ordinal */
13+
public final class BucketAndOrd<B> {
14+
15+
public final B bucket; // the bucket
16+
public long ord; // mutable ordinal of the bucket
17+
18+
public BucketAndOrd(B bucket) {
19+
this.bucket = bucket;
20+
}
21+
}

0 commit comments

Comments
 (0)