From 86f2d888a2207021c96772dd29ec6dc0b960fb7f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Fri, 22 Aug 2025 13:07:56 +0200
Subject: [PATCH 01/18] Fix wrong reduce memory estimate calculation
---
.../elasticsearch/action/search/QueryPhaseResultConsumer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index ec63d38616153..47dd3b28cc1de 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -462,7 +462,7 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) {
* the reduce completes.
*/
private static long estimateRamBytesUsedForReduce(long size) {
- return Math.round(1.5d * size - size);
+ return Math.round(1.5d * size);
}
private void consume(QuerySearchResult result, Runnable next) {
From e798aa034a39d1d5a8e7bad5b14b158123311d3e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Fri, 22 Aug 2025 15:59:19 +0200
Subject: [PATCH 02/18] Add batched merge results size estimations to CB
---
.../elasticsearch/action/search/QueryPhaseResultConsumer.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index 47dd3b28cc1de..2d579e1f6e898 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -231,6 +231,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
+ addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
}
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
@@ -392,7 +393,7 @@ private MergeResult partialReduce(
return new MergeResult(
processedShards,
newTopDocs,
- newAggs == null ? null : DelayableWriteable.referencing(newAggs),
+ newAggs != null ? DelayableWriteable.referencing(newAggs) : null,
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0
);
}
From 41b86a0558170883cfc7da0a32c795b37528aa91 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 27 Aug 2025 14:31:53 +0200
Subject: [PATCH 03/18] Updated javadocs on estimates and replaced 1.5-size
with 0.5
---
.../action/search/QueryPhaseResultConsumer.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index 2d579e1f6e898..a219e81b9b1cf 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -231,6 +231,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
+ // Add the estimate of the agg size
addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
}
for (QuerySearchResult result : buffer) {
@@ -458,12 +459,13 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) {
* Returns an estimation of the size that a reduce of the provided size
* would take on memory.
* This size is estimated as roughly 1.5 times the size of the serialized
- * aggregations that need to be reduced. This estimation can be completely
- * off for some aggregations but it is corrected with the real size after
- * the reduce completes.
+ * aggregations that need to be reduced.
+ * This method expects an already accounted size, so only an extra 0.5x is returned.
+ * This estimation can be completely off for some aggregations
+ * but it is corrected with the real size after the reduce completes.
*/
private static long estimateRamBytesUsedForReduce(long size) {
- return Math.round(1.5d * size);
+ return Math.round(0.5d * size);
}
private void consume(QuerySearchResult result, Runnable next) {
From 82709350e6e05a8de6d45572e23468941d90081d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Fri, 29 Aug 2025 19:03:41 +0200
Subject: [PATCH 04/18] Initial CB tests for batched aggs
---
.../common/io/stream/DelayableWriteable.java | 2 +-
.../search/QueryPhaseResultConsumerTests.java | 127 ++++++++++++++++++
2 files changed, 128 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java
index a2b6be6ab0363..ba66c2e9ae898 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java
@@ -75,7 +75,7 @@ public static DelayableWriteable referencing(Writeable.
}
}
- private DelayableWriteable() {}
+ protected DelayableWriteable() {}
/**
* Returns a {@linkplain DelayableWriteable} that stores its contents
diff --git a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
index e0b68647289b2..4d635d2cc582e 100644
--- a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
+++ b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
@@ -13,8 +13,14 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.common.io.stream.DelayableWriteable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
+import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
@@ -25,7 +31,9 @@
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
@@ -40,7 +48,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
public class QueryPhaseResultConsumerTests extends ESTestCase {
@@ -148,6 +159,122 @@ public void testProgressListenerExceptionsAreCaught() throws Exception {
}
}
+ public void testBatchedEstimateSizeTooBig() throws Exception {
+ SearchRequest searchRequest = new SearchRequest("index");
+ searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum")));
+
+ var circuitBreakerLimit = ByteSizeValue.ofMb(256);
+ var circuitBreaker = newLimitedBreaker(circuitBreakerLimit);
+ // More than what the CircuitBreaker should allow
+ long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 1.1);
+
+ try (
+ QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(
+ searchRequest,
+ executor,
+ circuitBreaker,
+ searchPhaseController,
+ () -> false,
+ new SearchProgressListener() {
+ },
+ 10,
+ e -> {}
+ )
+ ) {
+ var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
+ fail("This shouldn't be called");
+ return null;
+ }), aggregationEstimatedSize);
+ queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+
+ try {
+ queryPhaseResultConsumer.reduce();
+ fail("Expecting a circuit breaking exception to be thrown");
+ } catch (CircuitBreakingException e) {
+ assertThat(e.getBytesWanted(), equalTo(aggregationEstimatedSize));
+ }
+ }
+ }
+
+ public void testBatchedEstimateSizeTooBigAfterDeserialization() throws Exception {
+ SearchRequest searchRequest = new SearchRequest("index");
+ searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum")));
+
+ var circuitBreakerLimit = ByteSizeValue.ofMb(256);
+ var circuitBreaker = newLimitedBreaker(circuitBreakerLimit);
+ // Less than the CB, but more after the 1.5x
+ long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 0.75);
+
+ try (
+ QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(
+ searchRequest,
+ executor,
+ circuitBreaker,
+ searchPhaseController,
+ () -> false,
+ new SearchProgressListener() {
+ },
+ 10,
+ e -> {}
+ )
+ ) {
+ var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
+ fail("This shouldn't be called");
+ return null;
+ }), aggregationEstimatedSize);
+ queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+
+ try {
+ queryPhaseResultConsumer.reduce();
+ fail("Expecting a circuit breaking exception to be thrown");
+ } catch (CircuitBreakingException e) {
+ assertThat(circuitBreaker.getUsed(), greaterThanOrEqualTo(aggregationEstimatedSize));
+ assertThat(e.getBytesWanted(), equalTo((long) (aggregationEstimatedSize * 0.5)));
+ }
+ }
+ }
+
+ /**
+ * DelayableWriteable that delegates expansion to a supplier.
+ */
+ private static class DelegatingDelayableWriteable extends DelayableWriteable {
+ private final Supplier supplier;
+
+ private DelegatingDelayableWriteable(Supplier supplier) {
+ this.supplier = supplier;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) {
+ throw new UnsupportedOperationException("Not to be called");
+ }
+
+ @Override
+ public T expand() {
+ return supplier.get();
+ }
+
+ @Override
+ public Serialized asSerialized(Reader reader, NamedWriteableRegistry registry) {
+ throw new UnsupportedOperationException("Not to be called");
+ }
+
+ @Override
+ public boolean isSerialized() {
+ return true;
+ }
+
+ @Override
+ public long getSerializedSize() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+ }
+
private static class ThrowingSearchProgressListener extends SearchProgressListener {
private final AtomicInteger onQueryResult = new AtomicInteger(0);
private final AtomicInteger onPartialReduce = new AtomicInteger(0);
From d2fed37a9b75f9b2d7b779ff86979f1a50a76342 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Mon, 1 Sep 2025 15:59:03 +0200
Subject: [PATCH 05/18] Randomize tests
---
.../search/QueryPhaseResultConsumerTests.java | 46 +++++++++++++------
1 file changed, 33 insertions(+), 13 deletions(-)
diff --git a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
index 4d635d2cc582e..7c2d8fe8d0166 100644
--- a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
+++ b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
@@ -159,14 +159,18 @@ public void testProgressListenerExceptionsAreCaught() throws Exception {
}
}
+ /**
+ * Adds batches with a high simulated size, expecting the CB to trip before deserialization.
+ */
public void testBatchedEstimateSizeTooBig() throws Exception {
SearchRequest searchRequest = new SearchRequest("index");
searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum")));
+ var aggCount = randomIntBetween(1, 10);
var circuitBreakerLimit = ByteSizeValue.ofMb(256);
var circuitBreaker = newLimitedBreaker(circuitBreakerLimit);
// More than what the CircuitBreaker should allow
- long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 1.1);
+ long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 1.05 / aggCount);
try (
QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(
@@ -181,29 +185,41 @@ public void testBatchedEstimateSizeTooBig() throws Exception {
e -> {}
)
) {
- var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
- fail("This shouldn't be called");
- return null;
- }), aggregationEstimatedSize);
- queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+ for (int i = 0; i < aggCount; i++) {
+ // Add a dummy merge result with a high estimated size
+ var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
+ fail("This shouldn't be called");
+ return null;
+ }), aggregationEstimatedSize);
+ queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+ }
try {
queryPhaseResultConsumer.reduce();
fail("Expecting a circuit breaking exception to be thrown");
} catch (CircuitBreakingException e) {
+ // The last merge result estimate should break
assertThat(e.getBytesWanted(), equalTo(aggregationEstimatedSize));
}
}
}
+ /**
+ * Adds batches with a high simulated size, expecting the CB to trip before deserialization.
+ *
+ * Similar to {@link #testBatchedEstimateSizeTooBig()}, but this tests the extra size
+ *
+ */
public void testBatchedEstimateSizeTooBigAfterDeserialization() throws Exception {
SearchRequest searchRequest = new SearchRequest("index");
searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum")));
+ var aggCount = randomIntBetween(1, 10);
var circuitBreakerLimit = ByteSizeValue.ofMb(256);
var circuitBreaker = newLimitedBreaker(circuitBreakerLimit);
// Less than the CB, but more after the 1.5x
- long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 0.75);
+ long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 0.75 / aggCount);
+ long totalAggregationsEstimatedSize = aggregationEstimatedSize * aggCount;
try (
QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(
@@ -218,18 +234,22 @@ public void testBatchedEstimateSizeTooBigAfterDeserialization() throws Exception
e -> {}
)
) {
- var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
- fail("This shouldn't be called");
- return null;
- }), aggregationEstimatedSize);
- queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+ for (int i = 0; i < aggCount; i++) {
+ // Add a dummy merge result with a high estimated size
+ var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
+ fail("This shouldn't be called");
+ return null;
+ }), aggregationEstimatedSize);
+ queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+ }
try {
queryPhaseResultConsumer.reduce();
fail("Expecting a circuit breaking exception to be thrown");
} catch (CircuitBreakingException e) {
assertThat(circuitBreaker.getUsed(), greaterThanOrEqualTo(aggregationEstimatedSize));
- assertThat(e.getBytesWanted(), equalTo((long) (aggregationEstimatedSize * 0.5)));
+ // A final +0.5x is added to account for the serialized->deserialized extra size
+ assertThat(e.getBytesWanted(), equalTo(Math.round(totalAggregationsEstimatedSize * 0.5)));
}
}
}
From 449203fc9bbfb7b76f60d9e7fb244c5145b51ebe Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Mon, 1 Sep 2025 18:42:38 +0200
Subject: [PATCH 06/18] Add estimate for field merge result
---
.../elasticsearch/action/search/QueryPhaseResultConsumer.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index a219e81b9b1cf..0f12e9cbb8c89 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -226,6 +226,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
// execution for shards on the coordinating node itself
if (mergeResult != null) {
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
+ addEstimateAndMaybeBreak(mergeResult.estimatedSize);
}
Tuple batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
From d344228da471c93af0e2a4995fe7a01d74e06d26 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Tue, 9 Sep 2025 16:28:29 +0200
Subject: [PATCH 07/18] Added test cluster docs
---
.../org/elasticsearch/test/ESIntegTestCase.java | 4 ++--
.../elasticsearch/test/InternalTestCluster.java | 14 ++++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
index 8bc91ec6396cc..fbf269867104d 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
@@ -268,7 +268,7 @@
*
* A test cluster creates a set of nodes in the background before the test starts. The number of nodes in the cluster is
* determined at random and can change across tests. The {@link ClusterScope} allows configuring the initial number of nodes
- * that are created before the tests start.
+ * that are created before the tests start. More information about node configurations and settings in {@link InternalTestCluster}.
*
* {@literal @}NodeScope(scope=Scope.SUITE, numDataNodes=3)
* public class SomeIT extends ESIntegTestCase {
@@ -2172,7 +2172,7 @@ public enum Scope {
/**
* Indicates whether the cluster can have dedicated master nodes. If {@code false} means data nodes will serve as master nodes
- * and there will be no dedicated master (and data) nodes. Default is {@code false} which means
+ * and there will be no dedicated master (and data) nodes. Default is {@code true} which means
* dedicated master nodes will be randomly used.
*/
boolean supportsDedicatedMasters() default true;
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index e99abe49cb596..df6915fe280b5 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -176,6 +176,20 @@
* to the async nature of Elasticsearch in combination with randomized testing. Once Threads and asynchronous calls
* are involved reproducibility is very limited. This class should only be used through {@link ESIntegTestCase}.
*
+ * {@link NodeConfigurationSource#nodeSettings} ordinals
+ * The supplied node ordinal for this node settings getter is generated as follows:
+ *
+ * -
+ * The first indices correspond to the dedicated master nodes, enabled with the {@code randomlyAddDedicatedMasters} setting.
+ * Its amount can't currently be configured.
+ *
+ * -
+ * Then, the data nodes, controlled by the @{code minNumDataNodes} and @{code maxNumDataNodes} settings.
+ *
+ * -
+ * Finally, the coordinating-only, configured with the {@code numClientNodes} setting.
+ *
+ *
*/
public final class InternalTestCluster extends TestCluster {
From 0a89991e3e33e1cee266f0dc6c8b837873caf2c0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Tue, 9 Sep 2025 18:17:38 +0200
Subject: [PATCH 08/18] Added Integration test for the reduce phase CB
---
.../bucket/AggregationsCircuitBreakingIT.java | 173 ++++++++++++++++++
1 file changed, 173 insertions(+)
create mode 100644 modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
new file mode 100644
index 0000000000000..c769b9b0808ea
--- /dev/null
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.aggregations.bucket;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.aggregations.AggregationIntegTestCase;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexingPressure;
+import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING;
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING;
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING;
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING;
+import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.composite;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+
+@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 1)
+public class AggregationsCircuitBreakingIT extends AggregationIntegTestCase {
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+ // Most of the settings here exist to make the search as stable and deterministic as possible
+ var settings = Settings.builder()
+ .put(super.nodeSettings(nodeOrdinal, otherSettings))
+ .put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
+ .put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "100MB")
+ .put("thread_pool.search.size", 1);
+ if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) {
+ // Coordinator
+ settings.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "7MB");
+ } else {
+ // Datanode
+ // To avoid OOMs
+ settings
+ .put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true)
+ .put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
+ }
+ return settings.build();
+ }
+
+ /**
+ * Expect the breaker to trip in `QueryPhaseResultConsume#reduce()`, when reducing `MergeResult`s.
+ *
+ * After testing this, the agg serialized size is around 5MB.
+ * The CB is set to 7MB, as the reduction is expected to add an extra 50% overhead.
+ *
+ */
+ public void testCBTrippingOnReduction() throws IOException {
+ createIndex();
+ addDocs(100, 100, 100);
+
+ assertCBTrip(
+ Settings.builder(),
+ () -> internalCluster().coordOnlyNodeClient()
+ .prepareSearch("index")
+ .setSize(0)
+ .addAggregation(
+ composite(
+ "composite",
+ List.of(
+ new TermsValuesSourceBuilder("integer").field("integer"),
+ new TermsValuesSourceBuilder("long").field("long")
+ )
+ ).size(5000).subAggregation(topHits("top_hits").size(10))
+ )
+ .setBatchedReduceSize(2),
+ e -> {
+ var completeException = ExceptionsHelper.stackTrace(e);
+ // If a shard fails, we can't check reduction
+ assumeTrue(completeException, e.shardFailures().length == 0);
+ assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
+ assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
+ }
+ );
+ }
+
+ public void assertCBTrip(
+ Settings.Builder settingsBuilder,
+ Supplier requestSupplier,
+ Consumer exceptionCallback
+ ) {
+ if (settingsBuilder.keys().isEmpty() == false) {
+ updateClusterSettings(settingsBuilder);
+ }
+
+ try {
+ requestSupplier.get().get();
+
+ fail("Expected the breaker to trip");
+ } catch (SearchPhaseExecutionException e) {
+ exceptionCallback.accept(e);
+ } finally {
+ if (settingsBuilder.keys().isEmpty() == false) {
+ // Cleanup settings
+ var nullifiedSettings = Settings.builder();
+ settingsBuilder.keys().forEach(nullifiedSettings::putNull);
+ updateClusterSettings(nullifiedSettings);
+ }
+ }
+ }
+
+ private void createIndex() throws IOException {
+ XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
+ mappingBuilder.startObject();
+ mappingBuilder.startObject("properties");
+ {
+ mappingBuilder.startObject("integer");
+ mappingBuilder.field("type", "integer");
+ mappingBuilder.endObject();
+ }
+ {
+ mappingBuilder.startObject("long");
+ mappingBuilder.field("type", "long");
+ mappingBuilder.endObject();
+ }
+
+ mappingBuilder.endObject(); // properties
+ mappingBuilder.endObject();
+
+ assertAcked(
+ prepareCreate("index").setSettings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10)).build())
+ .setMapping(mappingBuilder)
+ );
+ }
+
+ private void addDocs(int docCount, int integerFieldMvCount, int longFieldMvCount) throws IOException {
+ List docs = new ArrayList<>();
+ for (int i = 0; i < docCount; i++) {
+ XContentBuilder docSource = XContentFactory.jsonBuilder();
+ docSource.startObject();
+ final int docNumber = i;
+ List integerValues = IntStream.range(0, integerFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
+ List longValues = LongStream.range(0, longFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
+ docSource.field("integer", integerValues);
+ docSource.field("long", longValues);
+ docSource.endObject();
+
+ docs.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource));
+ }
+ indexRandom(true, false, false, false, docs);
+ forceMerge(false);
+ flushAndRefresh("index");
+ }
+}
From 5f8a5cf84530b1ae6d6f5f267ca30e8a5fcbf5cf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Tue, 9 Sep 2025 18:25:21 +0200
Subject: [PATCH 09/18] Add nullable
---
.../elasticsearch/action/search/QueryPhaseResultConsumer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index 0f12e9cbb8c89..84b12c48fd128 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -338,7 +338,7 @@ private MergeResult partialReduce(
List toConsume,
List processedShards,
TopDocsStats topDocsStats,
- MergeResult lastMerge,
+ @Nullable MergeResult lastMerge,
int numReducePhases
) {
// ensure consistent ordering
From e8fda32243e8c8564dd25bae08cf02e0edeec097 Mon Sep 17 00:00:00 2001
From: elasticsearchmachine
Date: Tue, 9 Sep 2025 16:25:32 +0000
Subject: [PATCH 10/18] [CI] Auto commit changes from spotless
---
.../bucket/AggregationsCircuitBreakingIT.java | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
index c769b9b0808ea..b7072c1fe63f3 100644
--- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
@@ -9,8 +9,6 @@
package org.elasticsearch.aggregations.bucket;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -48,7 +46,7 @@
public class AggregationsCircuitBreakingIT extends AggregationIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
- // Most of the settings here exist to make the search as stable and deterministic as possible
+ // Most of the settings here exist to make the search as stable and deterministic as possible
var settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
@@ -60,9 +58,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
} else {
// Datanode
// To avoid OOMs
- settings
- .put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true)
- .put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
+ settings.put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true).put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
}
return settings.build();
}
From e17365417ad94b3726316323157090406b3ec32e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 14:54:44 +0200
Subject: [PATCH 11/18] Fixed memory leak and added Repeat to test
---
.../bucket/AggregationsCircuitBreakingIT.java | 5 ++
.../search/QueryPhaseResultConsumer.java | 55 +++++++++++--------
2 files changed, 36 insertions(+), 24 deletions(-)
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
index b7072c1fe63f3..b52741a4a3fb2 100644
--- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
@@ -9,6 +9,8 @@
package org.elasticsearch.aggregations.bucket;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -42,6 +44,9 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
+// Repeating to ensure everything was properly closed in the cluster.
+// Failing to close objects on CB exception led to errors only visible with @Repeat.
+@Repeat(iterations = 3)
@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 1)
public class AggregationsCircuitBreakingIT extends AggregationIntegTestCase {
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index 84b12c48fd128..edd196f6122e3 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -222,34 +222,35 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
final Deque> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
- // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
- // execution for shards on the coordinating node itself
- if (mergeResult != null) {
- consumePartialMergeResult(mergeResult, topDocsList, aggsList);
- addEstimateAndMaybeBreak(mergeResult.estimatedSize);
- }
- Tuple batchedResult;
- while ((batchedResult = batchedResults.poll()) != null) {
- topDocsStats.add(batchedResult.v1());
- consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
- // Add the estimate of the agg size
- addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
- }
- for (QuerySearchResult result : buffer) {
- topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
- if (topDocsList != null) {
- TopDocsAndMaxScore topDocs = result.consumeTopDocs();
- setShardIndex(topDocs.topDocs, result.getShardIndex());
- topDocsList.add(topDocs.topDocs);
- }
- }
+
SearchPhaseController.ReducedQueryPhase reducePhase;
long breakerSize = circuitBreakerBytes;
final InternalAggregations aggs;
try {
+ // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
+ // execution for shards on the coordinating node itself
+ if (mergeResult != null) {
+ consumePartialMergeResult(mergeResult, topDocsList, aggsList);
+ breakerSize = addEstimateAndMaybeBreak(mergeResult.estimatedSize);
+ }
+ Tuple batchedResult;
+ while ((batchedResult = batchedResults.poll()) != null) {
+ topDocsStats.add(batchedResult.v1());
+ consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
+ // Add the estimate of the agg size
+ breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize); // TODO: This leaves objects unclosed!
+ }
+ for (QuerySearchResult result : buffer) {
+ topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
+ if (topDocsList != null) {
+ TopDocsAndMaxScore topDocs = result.consumeTopDocs();
+ setShardIndex(topDocs.topDocs, result.getShardIndex());
+ topDocsList.add(topDocs.topDocs);
+ }
+ }
if (aggsList != null) {
// Add an estimate of the final reduce size
- breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
+ breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes));
aggs = aggregate(buffer.iterator(), new Iterator<>() {
@Override
public boolean hasNext() {
@@ -278,7 +279,13 @@ public DelayableWriteable next() {
);
buffer = null;
} finally {
- releaseAggs(buffer);
+ // Buffer is only null on exception
+ if (buffer != null) {
+ releaseAggs(buffer);
+ if (aggsList != null) {
+ Releasables.close(aggsList);
+ }
+ }
}
if (hasAggs
// reduced aggregations can be null if all shards failed
@@ -643,7 +650,7 @@ public void onFailure(Exception exc) {
});
}
- private static void releaseAggs(List toConsume) {
+ private static void releaseAggs(@Nullable List toConsume) {
if (toConsume != null) {
for (QuerySearchResult result : toConsume) {
result.releaseAggs();
From 3d0f86498ebd40e0584885653f7f3264f404e6c9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 14:59:18 +0200
Subject: [PATCH 12/18] Removed comment
---
.../elasticsearch/action/search/QueryPhaseResultConsumer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index edd196f6122e3..09b2d4d3e21c2 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -238,7 +238,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
// Add the estimate of the agg size
- breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize); // TODO: This leaves objects unclosed!
+ breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
}
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
From c9101bb5c47f712849f54206e78dbea48b823298 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 15:02:46 +0200
Subject: [PATCH 13/18] Fixed comment
---
.../elasticsearch/action/search/QueryPhaseResultConsumer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index 09b2d4d3e21c2..bee0004199596 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -279,7 +279,7 @@ public DelayableWriteable next() {
);
buffer = null;
} finally {
- // Buffer is only null on exception
+ // Buffer is non-null on exception
if (buffer != null) {
releaseAggs(buffer);
if (aggsList != null) {
From ac0929bf356a7caa23eb7894001188f8c5bb7e5b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 15:09:34 +0200
Subject: [PATCH 14/18] Decrement ref on search response in test
---
.../aggregations/bucket/AggregationsCircuitBreakingIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
index b52741a4a3fb2..f363589de05c4 100644
--- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
@@ -114,7 +114,7 @@ public void assertCBTrip(
}
try {
- requestSupplier.get().get();
+ requestSupplier.get().get().decRef();
fail("Expected the breaker to trip");
} catch (SearchPhaseExecutionException e) {
From 77b152b67f4c932a7f87fbf473864d3a4910063f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 15:48:47 +0200
Subject: [PATCH 15/18] Simplify test and add a random number of nodes to make
it fail in both initial batch accounting and in x1.5 increase
---
.../bucket/AggregationsCircuitBreakingIT.java | 15 +--------------
1 file changed, 1 insertion(+), 14 deletions(-)
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
index f363589de05c4..626e85fa825a5 100644
--- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
@@ -47,7 +47,7 @@
// Repeating to ensure everything was properly closed in the cluster.
// Failing to close objects on CB exception led to errors only visible with @Repeat.
@Repeat(iterations = 3)
-@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 1)
+@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
public class AggregationsCircuitBreakingIT extends AggregationIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
@@ -80,7 +80,6 @@ public void testCBTrippingOnReduction() throws IOException {
addDocs(100, 100, 100);
assertCBTrip(
- Settings.builder(),
() -> internalCluster().coordOnlyNodeClient()
.prepareSearch("index")
.setSize(0)
@@ -105,27 +104,15 @@ public void testCBTrippingOnReduction() throws IOException {
}
public void assertCBTrip(
- Settings.Builder settingsBuilder,
Supplier requestSupplier,
Consumer exceptionCallback
) {
- if (settingsBuilder.keys().isEmpty() == false) {
- updateClusterSettings(settingsBuilder);
- }
-
try {
requestSupplier.get().get().decRef();
fail("Expected the breaker to trip");
} catch (SearchPhaseExecutionException e) {
exceptionCallback.accept(e);
- } finally {
- if (settingsBuilder.keys().isEmpty() == false) {
- // Cleanup settings
- var nullifiedSettings = Settings.builder();
- settingsBuilder.keys().forEach(nullifiedSettings::putNull);
- updateClusterSettings(nullifiedSettings);
- }
}
}
From e6d7beba44ab6da1ffd83ca7d1dc1ecde8431c76 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 15:50:45 +0200
Subject: [PATCH 16/18] Supress repeat forbidden api error
---
.../bucket/AggregationsCircuitBreakingIT.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
index 626e85fa825a5..33ba9fe0e80e4 100644
--- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
@@ -19,6 +19,7 @@
import org.elasticsearch.aggregations.AggregationIntegTestCase;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
@@ -46,6 +47,10 @@
// Repeating to ensure everything was properly closed in the cluster.
// Failing to close objects on CB exception led to errors only visible with @Repeat.
+@SuppressForbidden(
+ reason = "Repeating to ensure everything was properly closed in the cluster. "
+ + "Failing to close objects on CB exception led to errors only visible with @Repeat."
+)
@Repeat(iterations = 3)
@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
public class AggregationsCircuitBreakingIT extends AggregationIntegTestCase {
@@ -103,10 +108,7 @@ public void testCBTrippingOnReduction() throws IOException {
);
}
- public void assertCBTrip(
- Supplier requestSupplier,
- Consumer exceptionCallback
- ) {
+ public void assertCBTrip(Supplier requestSupplier, Consumer exceptionCallback) {
try {
requestSupplier.get().get().decRef();
From 8e4fd416bb85070f29ca9b0cf527931788c9c6e4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Wed, 10 Sep 2025 18:15:43 +0200
Subject: [PATCH 17/18] Update docs/changelog/133398.yaml
---
docs/changelog/133398.yaml | 5 +++++
1 file changed, 5 insertions(+)
create mode 100644 docs/changelog/133398.yaml
diff --git a/docs/changelog/133398.yaml b/docs/changelog/133398.yaml
new file mode 100644
index 0000000000000..178ccddae9d4f
--- /dev/null
+++ b/docs/changelog/133398.yaml
@@ -0,0 +1,5 @@
+pr: 133398
+summary: "Aggs: Fix CB on reduction phase"
+area: Aggregations
+type: bug
+issues: []
From a24eba1a4f1dfb60e0b2977ea47aec51b0c110be Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Thu, 11 Sep 2025 16:45:08 +0200
Subject: [PATCH 18/18] Integrate test repetition in test case and remove
unused setting
---
...ggregationReductionCircuitBreakingIT.java} | 63 +++++++++----------
1 file changed, 28 insertions(+), 35 deletions(-)
rename modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/{AggregationsCircuitBreakingIT.java => AggregationReductionCircuitBreakingIT.java} (77%)
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java
similarity index 77%
rename from modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
rename to modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java
index 33ba9fe0e80e4..df789dcf69927 100644
--- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationsCircuitBreakingIT.java
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java
@@ -9,8 +9,6 @@
package org.elasticsearch.aggregations.bucket;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -19,8 +17,6 @@
import org.elasticsearch.aggregations.AggregationIntegTestCase;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.SuppressForbidden;
-import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
@@ -45,22 +41,15 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
-// Repeating to ensure everything was properly closed in the cluster.
-// Failing to close objects on CB exception led to errors only visible with @Repeat.
-@SuppressForbidden(
- reason = "Repeating to ensure everything was properly closed in the cluster. "
- + "Failing to close objects on CB exception led to errors only visible with @Repeat."
-)
-@Repeat(iterations = 3)
@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
-public class AggregationsCircuitBreakingIT extends AggregationIntegTestCase {
+public class AggregationReductionCircuitBreakingIT extends AggregationIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
// Most of the settings here exist to make the search as stable and deterministic as possible
var settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
- .put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "100MB")
+ // More threads may lead to more consumption and the test failing in the datanodes
.put("thread_pool.search.size", 1);
if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) {
// Coordinator
@@ -84,28 +73,32 @@ public void testCBTrippingOnReduction() throws IOException {
createIndex();
addDocs(100, 100, 100);
- assertCBTrip(
- () -> internalCluster().coordOnlyNodeClient()
- .prepareSearch("index")
- .setSize(0)
- .addAggregation(
- composite(
- "composite",
- List.of(
- new TermsValuesSourceBuilder("integer").field("integer"),
- new TermsValuesSourceBuilder("long").field("long")
- )
- ).size(5000).subAggregation(topHits("top_hits").size(10))
- )
- .setBatchedReduceSize(2),
- e -> {
- var completeException = ExceptionsHelper.stackTrace(e);
- // If a shard fails, we can't check reduction
- assumeTrue(completeException, e.shardFailures().length == 0);
- assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
- assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
- }
- );
+ // Some leaks (Check ESTestCase#loggedLeaks) aren't logged unless we run the test twice.
+ // So we run it multiple times to ensure everything gets collected before the final test checks.
+ for (int i = 0; i < 10; i++) {
+ assertCBTrip(
+ () -> internalCluster().coordOnlyNodeClient()
+ .prepareSearch("index")
+ .setSize(0)
+ .addAggregation(
+ composite(
+ "composite",
+ List.of(
+ new TermsValuesSourceBuilder("integer").field("integer"),
+ new TermsValuesSourceBuilder("long").field("long")
+ )
+ ).size(5000).subAggregation(topHits("top_hits").size(10))
+ )
+ .setBatchedReduceSize(randomIntBetween(2, 5)),
+ e -> {
+ var completeException = ExceptionsHelper.stackTrace(e);
+ // If a shard fails, we can't check reduction
+ assumeTrue(completeException, e.shardFailures().length == 0);
+ assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
+ assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
+ }
+ );
+ }
}
public void assertCBTrip(Supplier requestSupplier, Consumer exceptionCallback) {