Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,24 @@ protected B reduceBucket(List<B> buckets, ReduceContext context) {
}
}

InternalAggregations subAggs;
InternalAggregations reducedSubAggs;
if (aggregationsList.isEmpty()) {
subAggs = InternalAggregations.EMPTY;
reducedSubAggs = InternalAggregations.EMPTY;
} else {
subAggs = InternalAggregations.reduce(aggregationsList, context);
reducedSubAggs = InternalAggregations.reduce(aggregationsList, context);
}
return createBucket(docCount, subAggs, docCountError, buckets.get(0));

// In-place mutation: reuse first bucket instead of allocating new one
// Only applies to InternalTerms.Bucket subclasses (LongTerms, StringTerms, etc.)
// InternalMultiTerms.Bucket has a different hierarchy and uses createBucket()
B firstBucket = buckets.getFirst();
if (firstBucket instanceof Bucket<?> mutableBucket) {
mutableBucket.docCount = docCount;
mutableBucket.aggregations = reducedSubAggs;
mutableBucket.docCountError = docCountError;
return firstBucket;
}
return createBucket(docCount, reducedSubAggs, docCountError, firstBucket);
}

protected abstract void setDocCountError(long docCountError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1295,21 +1295,20 @@ public void testMixLongAndDouble() throws Exception {
b -> {},
PipelineTree.EMPTY
);
for (InternalAggregation internalAgg : aggs) {
InternalAggregation mergedAggs = internalAgg.reduce(aggs, ctx);
assertTrue(mergedAggs instanceof DoubleTerms);
long expected = numLongs + numDoubles;
List<? extends Terms.Bucket> buckets = ((DoubleTerms) mergedAggs).getBuckets();
assertEquals(4, buckets.size());
assertEquals("1.0", buckets.get(0).getKeyAsString());
assertEquals(expected, buckets.get(0).getDocCount());
assertEquals("10.0", buckets.get(1).getKeyAsString());
assertEquals(expected * 2, buckets.get(1).getDocCount());
assertEquals("100.0", buckets.get(2).getKeyAsString());
assertEquals(expected * 2, buckets.get(2).getDocCount());
assertEquals("1000.0", buckets.get(3).getKeyAsString());
assertEquals(expected, buckets.get(3).getDocCount());
}
// Call reduce once (as in production) - the first aggregation leads the reduce
InternalAggregation mergedAggs = aggs.get(0).reduce(aggs, ctx);
assertTrue(mergedAggs instanceof DoubleTerms);
long expected = numLongs + numDoubles;
List<? extends Terms.Bucket> buckets = ((DoubleTerms) mergedAggs).getBuckets();
assertEquals(4, buckets.size());
assertEquals("1.0", buckets.get(0).getKeyAsString());
assertEquals(expected, buckets.get(0).getDocCount());
assertEquals("10.0", buckets.get(1).getKeyAsString());
assertEquals(expected * 2, buckets.get(1).getDocCount());
assertEquals("100.0", buckets.get(2).getKeyAsString());
assertEquals(expected * 2, buckets.get(2).getDocCount());
assertEquals("1000.0", buckets.get(3).getKeyAsString());
assertEquals(expected, buckets.get(3).getDocCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,16 @@ public void testReduceRandom() throws IOException {
int size = between(1, 200);
List<T> inputs = randomResultsToReduce(name, size);
assertThat(inputs, hasSize(size));
// Create a deep copy of inputs for verification, since reduce may mutate buckets in-place
List<T> inputsForVerification = new ArrayList<>();
for (T input : inputs) {
inputsForVerification.add(copyNamedWriteable(input, getNamedWriteableRegistry(), categoryClass()));
}
List<InternalAggregation> toReduce = new ArrayList<>();
toReduce.addAll(inputs);
// Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce()
inputs.sort(INTERNAL_AGG_COMPARATOR);
inputsForVerification.sort(INTERNAL_AGG_COMPARATOR);
ScriptService mockScriptService = mockScriptService();
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
if (randomBoolean() && toReduce.size() > 1) {
Expand Down Expand Up @@ -418,7 +424,7 @@ public void testReduceRandom() throws IOException {
@SuppressWarnings("unchecked")
T reduced = (T) inputs.get(0).reduce(toReduce, context);
doAssertReducedMultiBucketConsumer(reduced, bucketConsumer);
assertReduced(reduced, inputs);
assertReduced(reduced, inputsForVerification);
}

protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
Expand Down
Loading