Skip to content

Commit bdf32e3

Browse files
authored
Reduce InternalSignificantTerms in a streaming fashion (#105481)
1 parent a103e3c commit bdf32e3

File tree

1 file changed

+39
-44
lines changed

1 file changed

+39
-44
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,20 @@
99

1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.core.Releasables;
1213
import org.elasticsearch.search.DocValueFormat;
1314
import org.elasticsearch.search.aggregations.AggregationReduceContext;
1415
import org.elasticsearch.search.aggregations.Aggregator;
1516
import org.elasticsearch.search.aggregations.AggregatorReducer;
1617
import org.elasticsearch.search.aggregations.InternalAggregation;
1718
import org.elasticsearch.search.aggregations.InternalAggregations;
1819
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
20+
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
1921
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
2022
import org.elasticsearch.search.aggregations.support.SamplingContext;
2123
import org.elasticsearch.xcontent.XContentBuilder;
2224

2325
import java.io.IOException;
24-
import java.util.ArrayList;
2526
import java.util.Arrays;
2627
import java.util.HashMap;
2728
import java.util.List;
@@ -201,50 +202,44 @@ protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceCont
201202
return new AggregatorReducer() {
202203
long globalSubsetSize = 0;
203204
long globalSupersetSize = 0;
205+
final Map<String, ReducerAndProto<B>> buckets = new HashMap<>();
204206

205-
final List<InternalSignificantTerms<A, B>> aggregations = new ArrayList<>(size);
206-
207-
// Compute the overall result set size and the corpus size using the
208-
// top-level Aggregations from each shard
209207
@Override
210208
public void accept(InternalAggregation aggregation) {
211209
@SuppressWarnings("unchecked")
212-
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
210+
final InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
211+
// Compute the overall result set size and the corpus size using the
212+
// top-level Aggregations from each shard
213213
globalSubsetSize += terms.getSubsetSize();
214214
globalSupersetSize += terms.getSupersetSize();
215-
aggregations.add(terms);
215+
for (B bucket : terms.getBuckets()) {
216+
final ReducerAndProto<B> reducerAndProto = buckets.computeIfAbsent(
217+
bucket.getKeyAsString(),
218+
k -> new ReducerAndProto<>(new MultiBucketAggregatorsReducer(reduceContext, size), bucket)
219+
);
220+
reducerAndProto.reducer.accept(bucket);
221+
reducerAndProto.subsetDf[0] += bucket.subsetDf;
222+
reducerAndProto.supersetDf[0] += bucket.supersetDf;
223+
}
216224
}
217225

218226
@Override
219227
public InternalAggregation get() {
220-
final Map<String, List<B>> buckets = new HashMap<>();
221-
for (InternalSignificantTerms<A, B> terms : aggregations) {
222-
for (B bucket : terms.getBuckets()) {
223-
List<B> existingBuckets = buckets.computeIfAbsent(bucket.getKeyAsString(), k -> new ArrayList<>(size));
224-
// Adjust the buckets with the global stats representing the
225-
// total size of the pots from which the stats are drawn
226-
existingBuckets.add(
227-
createBucket(
228-
bucket.getSubsetDf(),
229-
globalSubsetSize,
230-
bucket.getSupersetDf(),
231-
globalSupersetSize,
232-
bucket.aggregations,
233-
bucket
234-
)
235-
);
236-
}
237-
}
238-
239228
final SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext);
240229
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
241230
final BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
242-
for (Map.Entry<String, List<B>> entry : buckets.entrySet()) {
243-
List<B> sameTermBuckets = entry.getValue();
244-
final B b = reduceBucket(sameTermBuckets, reduceContext);
231+
for (ReducerAndProto<B> reducerAndProto : buckets.values()) {
232+
final B b = createBucket(
233+
reducerAndProto.subsetDf[0],
234+
globalSubsetSize,
235+
reducerAndProto.supersetDf[0],
236+
globalSupersetSize,
237+
reducerAndProto.reducer.get(),
238+
reducerAndProto.proto
239+
);
245240
b.updateScore(heuristic);
246241
if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
247-
B removed = ordered.insertWithOverflow(b);
242+
final B removed = ordered.insertWithOverflow(b);
248243
if (removed == null) {
249244
reduceContext.consumeBucketsAndMaybeBreak(1);
250245
} else {
@@ -254,15 +249,28 @@ public InternalAggregation get() {
254249
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
255250
}
256251
}
257-
B[] list = createBucketsArray(ordered.size());
252+
final B[] list = createBucketsArray(ordered.size());
258253
for (int i = ordered.size() - 1; i >= 0; i--) {
259254
list[i] = ordered.pop();
260255
}
261256
return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
262257
}
258+
259+
@Override
260+
public void close() {
261+
for (ReducerAndProto<B> reducerAndProto : buckets.values()) {
262+
Releasables.close(reducerAndProto.reducer);
263+
}
264+
}
263265
};
264266
}
265267

268+
private record ReducerAndProto<B>(MultiBucketAggregatorsReducer reducer, B proto, long[] subsetDf, long[] supersetDf) {
269+
private ReducerAndProto(MultiBucketAggregatorsReducer reducer, B proto) {
270+
this(reducer, proto, new long[] { 0 }, new long[] { 0 });
271+
}
272+
}
273+
266274
@Override
267275
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
268276
long supersetSize = samplingContext.scaleUp(getSupersetSize());
@@ -285,19 +293,6 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
285293
);
286294
}
287295

288-
private B reduceBucket(List<B> buckets, AggregationReduceContext context) {
289-
assert buckets.isEmpty() == false;
290-
long subsetDf = 0;
291-
long supersetDf = 0;
292-
for (B bucket : buckets) {
293-
subsetDf += bucket.subsetDf;
294-
supersetDf += bucket.supersetDf;
295-
}
296-
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
297-
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
298-
return createBucket(subsetDf, buckets.get(0).subsetSize, supersetDf, buckets.get(0).supersetSize, aggs, buckets.get(0));
299-
}
300-
301296
abstract B createBucket(
302297
long subsetDf,
303298
long subsetSize,

0 commit comments

Comments
 (0)