Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
86f2d88
Fix wrong reduce memory estimate calculation
ivancea Aug 22, 2025
e798aa0
Add batched merge results size estimations to CB
ivancea Aug 22, 2025
7b3d88d
Merge branch 'main' into aggs-composite-memory
ivancea Aug 25, 2025
87ccf54
Merge branch 'main' into aggs-composite-memory
ivancea Aug 25, 2025
6fc8fa8
Merge branch 'main' into aggs-composite-memory
ivancea Aug 27, 2025
51fe8fc
Merge branch 'main' into aggs-composite-memory
ivancea Aug 27, 2025
41b86a0
Updated javadocs on estimates and replaced 1.5-size with 0.5
ivancea Aug 27, 2025
eb46c13
Merge branch 'main' into aggs-composite-memory
ivancea Aug 28, 2025
8270935
Initial CB tests for batched aggs
ivancea Aug 29, 2025
ba3748b
Merge branch 'main' into aggs-composite-memory
ivancea Aug 29, 2025
2140240
Merge branch 'main' into aggs-composite-memory
ivancea Sep 1, 2025
d2fed37
Randomize tests
ivancea Sep 1, 2025
449203f
Add estimate for field merge result
ivancea Sep 1, 2025
7c7d3eb
Merge branch 'main' into aggs-composite-memory
ivancea Sep 3, 2025
7402133
Merge branch 'main' into aggs-composite-memory
ivancea Sep 8, 2025
d344228
Added test cluster docs
ivancea Sep 9, 2025
f9b8379
Merge branch 'main' into aggs-composite-memory
ivancea Sep 9, 2025
0a89991
Added Integration test for the reduce phase CB
ivancea Sep 9, 2025
5f8a5cf
Add nullable
ivancea Sep 9, 2025
e8fda32
[CI] Auto commit changes from spotless
Sep 9, 2025
e173654
Fixed memory leak and added Repeat to test
ivancea Sep 10, 2025
3d0f864
Removed comment
ivancea Sep 10, 2025
c9101bb
Fixed comment
ivancea Sep 10, 2025
ac0929b
Decrement ref on search response in test
ivancea Sep 10, 2025
77b152b
Simplify test and add a random number of nodes to make it fail in bot…
ivancea Sep 10, 2025
e6d7beb
Supress repeat forbidden api error
ivancea Sep 10, 2025
09aff51
Merge branch 'main' into aggs-composite-memory
ivancea Sep 10, 2025
8e4fd41
Update docs/changelog/133398.yaml
ivancea Sep 10, 2025
a24eba1
Integrate test repetition in test case and remove unused setting
ivancea Sep 11, 2025
1581fc4
Merge branch 'main' into aggs-composite-memory
ivancea Sep 12, 2025
f10f1a5
Merge branch 'main' into aggs-composite-memory
ivancea Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133398.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133398
summary: "Aggs: Fix CB on reduction phase"
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.aggregations.bucket;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.aggregations.AggregationIntegTestCase;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING;
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING;
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING;
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING;
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.elasticsearch.search.aggregations.AggregationBuilders.composite;
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;

@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
public class AggregationReductionCircuitBreakingIT extends AggregationIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
// Most of the settings here exist to make the search as stable and deterministic as possible
var settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
// More threads may lead to more consumption and the test failing in the datanodes
.put("thread_pool.search.size", 1);
if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) {
// Coordinator
settings.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "7MB");
} else {
// Datanode
// To avoid OOMs
settings.put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true).put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
}
return settings.build();
}

/**
* Expect the breaker to trip in `QueryPhaseResultConsume#reduce()`, when reducing `MergeResult`s.
* <p>
* After testing this, the agg serialized size is around 5MB.
* The CB is set to 7MB, as the reduction is expected to add an extra 50% overhead.
* </p>
*/
public void testCBTrippingOnReduction() throws IOException {
createIndex();
addDocs(100, 100, 100);

// Some leaks (Check ESTestCase#loggedLeaks) aren't logged unless we run the test twice.
// So we run it multiple times to ensure everything gets collected before the final test checks.
for (int i = 0; i < 10; i++) {
assertCBTrip(
() -> internalCluster().coordOnlyNodeClient()
.prepareSearch("index")
.setSize(0)
.addAggregation(
composite(
"composite",
List.of(
new TermsValuesSourceBuilder("integer").field("integer"),
new TermsValuesSourceBuilder("long").field("long")
)
).size(5000).subAggregation(topHits("top_hits").size(10))
)
.setBatchedReduceSize(randomIntBetween(2, 5)),
e -> {
var completeException = ExceptionsHelper.stackTrace(e);
// If a shard fails, we can't check reduction
assumeTrue(completeException, e.shardFailures().length == 0);
assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
}
);
}
}

public void assertCBTrip(Supplier<SearchRequestBuilder> requestSupplier, Consumer<SearchPhaseExecutionException> exceptionCallback) {
try {
requestSupplier.get().get().decRef();

fail("Expected the breaker to trip");
} catch (SearchPhaseExecutionException e) {
exceptionCallback.accept(e);
}
}

private void createIndex() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
mappingBuilder.startObject();
mappingBuilder.startObject("properties");
{
mappingBuilder.startObject("integer");
mappingBuilder.field("type", "integer");
mappingBuilder.endObject();
}
{
mappingBuilder.startObject("long");
mappingBuilder.field("type", "long");
mappingBuilder.endObject();
}

mappingBuilder.endObject(); // properties
mappingBuilder.endObject();

assertAcked(
prepareCreate("index").setSettings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10)).build())
.setMapping(mappingBuilder)
);
}

private void addDocs(int docCount, int integerFieldMvCount, int longFieldMvCount) throws IOException {
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
XContentBuilder docSource = XContentFactory.jsonBuilder();
docSource.startObject();
final int docNumber = i;
List<Integer> integerValues = IntStream.range(0, integerFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
List<Long> longValues = LongStream.range(0, longFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
docSource.field("integer", integerValues);
docSource.field("long", longValues);
docSource.endObject();

docs.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource));
}
indexRandom(true, false, false, false, docs);
forceMerge(false);
flushAndRefresh("index");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,31 +222,35 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
// execution for shards on the coordinating node itself
if (mergeResult != null) {
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
}
Tuple<TopDocsStats, MergeResult> batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
}
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
if (topDocsList != null) {
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
setShardIndex(topDocs.topDocs, result.getShardIndex());
topDocsList.add(topDocs.topDocs);
}
}

SearchPhaseController.ReducedQueryPhase reducePhase;
long breakerSize = circuitBreakerBytes;
final InternalAggregations aggs;
try {
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
// execution for shards on the coordinating node itself
if (mergeResult != null) {
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
breakerSize = addEstimateAndMaybeBreak(mergeResult.estimatedSize);
}
Tuple<TopDocsStats, MergeResult> batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
// Add the estimate of the agg size
breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
}
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
if (topDocsList != null) {
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
setShardIndex(topDocs.topDocs, result.getShardIndex());
topDocsList.add(topDocs.topDocs);
}
}
if (aggsList != null) {
// Add an estimate of the final reduce size
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes));
aggs = aggregate(buffer.iterator(), new Iterator<>() {
@Override
public boolean hasNext() {
Expand Down Expand Up @@ -275,7 +279,13 @@ public DelayableWriteable<InternalAggregations> next() {
);
buffer = null;
} finally {
releaseAggs(buffer);
// Buffer is non-null on exception
if (buffer != null) {
releaseAggs(buffer);
if (aggsList != null) {
Releasables.close(aggsList);
}
}
}
if (hasAggs
// reduced aggregations can be null if all shards failed
Expand Down Expand Up @@ -335,7 +345,7 @@ private MergeResult partialReduce(
List<QuerySearchResult> toConsume,
List<SearchShard> processedShards,
TopDocsStats topDocsStats,
MergeResult lastMerge,
@Nullable MergeResult lastMerge,
int numReducePhases
) {
// ensure consistent ordering
Expand Down Expand Up @@ -392,7 +402,7 @@ private MergeResult partialReduce(
return new MergeResult(
processedShards,
newTopDocs,
newAggs == null ? null : DelayableWriteable.referencing(newAggs),
newAggs != null ? DelayableWriteable.referencing(newAggs) : null,
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0
);
}
Expand Down Expand Up @@ -457,12 +467,13 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) {
* Returns an estimation of the size that a reduce of the provided size
* would take on memory.
* This size is estimated as roughly 1.5 times the size of the serialized
* aggregations that need to be reduced. This estimation can be completely
* off for some aggregations but it is corrected with the real size after
* the reduce completes.
* aggregations that need to be reduced.
* This method expects an already accounted size, so only an extra 0.5x is returned.
* This estimation can be completely off for some aggregations
* but it is corrected with the real size after the reduce completes.
*/
private static long estimateRamBytesUsedForReduce(long size) {
return Math.round(1.5d * size - size);
return Math.round(0.5d * size);
}

private void consume(QuerySearchResult result, Runnable next) {
Expand Down Expand Up @@ -639,7 +650,7 @@ public void onFailure(Exception exc) {
});
}

private static void releaseAggs(List<QuerySearchResult> toConsume) {
private static void releaseAggs(@Nullable List<QuerySearchResult> toConsume) {
if (toConsume != null) {
for (QuerySearchResult result : toConsume) {
result.releaseAggs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(Writeable.
}
}

private DelayableWriteable() {}
protected DelayableWriteable() {}

/**
* Returns a {@linkplain DelayableWriteable} that stores its contents
Expand Down
Loading