diff --git a/docs/changelog/133398.yaml b/docs/changelog/133398.yaml new file mode 100644 index 0000000000000..178ccddae9d4f --- /dev/null +++ b/docs/changelog/133398.yaml @@ -0,0 +1,5 @@ +pr: 133398 +summary: "Aggs: Fix CB on reduction phase" +area: Aggregations +type: bug +issues: [] diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java new file mode 100644 index 0000000000000..df789dcf69927 --- /dev/null +++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.aggregations.bucket; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.aggregations.AggregationIntegTestCase; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING; +import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING; +import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING; +import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING; +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; +import static org.elasticsearch.search.aggregations.AggregationBuilders.composite; +import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; + +@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1) +public class AggregationReductionCircuitBreakingIT extends AggregationIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + // Most of the settings here exist to make the search as stable and deterministic as possible + var settings = Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory") + // More threads may lead to more consumption and the test failing in the datanodes + .put("thread_pool.search.size", 1); + if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) { + // Coordinator + settings.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "7MB"); + } else { + // Datanode + // To avoid OOMs + settings.put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true).put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%"); + } + return settings.build(); + } + + /** + * Expect the breaker to trip in `QueryPhaseResultConsume#reduce()`, when reducing `MergeResult`s. + *

+ * After testing this, the agg serialized size is around 5MB. + * The CB is set to 7MB, as the reduction is expected to add an extra 50% overhead. + *

+ */ + public void testCBTrippingOnReduction() throws IOException { + createIndex(); + addDocs(100, 100, 100); + + // Some leaks (Check ESTestCase#loggedLeaks) aren't logged unless we run the test twice. + // So we run it multiple times to ensure everything gets collected before the final test checks. + for (int i = 0; i < 10; i++) { + assertCBTrip( + () -> internalCluster().coordOnlyNodeClient() + .prepareSearch("index") + .setSize(0) + .addAggregation( + composite( + "composite", + List.of( + new TermsValuesSourceBuilder("integer").field("integer"), + new TermsValuesSourceBuilder("long").field("long") + ) + ).size(5000).subAggregation(topHits("top_hits").size(10)) + ) + .setBatchedReduceSize(randomIntBetween(2, 5)), + e -> { + var completeException = ExceptionsHelper.stackTrace(e); + // If a shard fails, we can't check reduction + assumeTrue(completeException, e.shardFailures().length == 0); + assertThat(e.getCause(), instanceOf(CircuitBreakingException.class)); + assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce")); + } + ); + } + } + + public void assertCBTrip(Supplier requestSupplier, Consumer exceptionCallback) { + try { + requestSupplier.get().get().decRef(); + + fail("Expected the breaker to trip"); + } catch (SearchPhaseExecutionException e) { + exceptionCallback.accept(e); + } + } + + private void createIndex() throws IOException { + XContentBuilder mappingBuilder = XContentFactory.jsonBuilder(); + mappingBuilder.startObject(); + mappingBuilder.startObject("properties"); + { + mappingBuilder.startObject("integer"); + mappingBuilder.field("type", "integer"); + mappingBuilder.endObject(); + } + { + mappingBuilder.startObject("long"); + mappingBuilder.field("type", "long"); + mappingBuilder.endObject(); + } + + mappingBuilder.endObject(); // properties + mappingBuilder.endObject(); + + assertAcked( + prepareCreate("index").setSettings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10)).build()) + .setMapping(mappingBuilder) + ); + } + + private void addDocs(int docCount, int integerFieldMvCount, int longFieldMvCount) throws IOException { + List docs = new ArrayList<>(); + for (int i = 0; i < docCount; i++) { + XContentBuilder docSource = XContentFactory.jsonBuilder(); + docSource.startObject(); + final int docNumber = i; + List integerValues = IntStream.range(0, integerFieldMvCount).map(x -> docNumber + x * 100).boxed().toList(); + List longValues = LongStream.range(0, longFieldMvCount).map(x -> docNumber + x * 100).boxed().toList(); + docSource.field("integer", integerValues); + docSource.field("long", longValues); + docSource.endObject(); + + docs.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource)); + } + indexRandom(true, false, false, false, docs); + forceMerge(false); + flushAndRefresh("index"); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index ec63d38616153..bee0004199596 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -222,31 +222,35 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size(); final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null; final Deque> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null; - // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level - // execution for shards on the coordinating node itself - if (mergeResult != null) { - consumePartialMergeResult(mergeResult, topDocsList, aggsList); - } - Tuple batchedResult; - while ((batchedResult = batchedResults.poll()) != null) { - topDocsStats.add(batchedResult.v1()); - consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList); - } - for (QuerySearchResult result : buffer) { - topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly()); - if (topDocsList != null) { - TopDocsAndMaxScore topDocs = result.consumeTopDocs(); - setShardIndex(topDocs.topDocs, result.getShardIndex()); - topDocsList.add(topDocs.topDocs); - } - } + SearchPhaseController.ReducedQueryPhase reducePhase; long breakerSize = circuitBreakerBytes; final InternalAggregations aggs; try { + // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level + // execution for shards on the coordinating node itself + if (mergeResult != null) { + consumePartialMergeResult(mergeResult, topDocsList, aggsList); + breakerSize = addEstimateAndMaybeBreak(mergeResult.estimatedSize); + } + Tuple batchedResult; + while ((batchedResult = batchedResults.poll()) != null) { + topDocsStats.add(batchedResult.v1()); + consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList); + // Add the estimate of the agg size + breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize); + } + for (QuerySearchResult result : buffer) { + topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly()); + if (topDocsList != null) { + TopDocsAndMaxScore topDocs = result.consumeTopDocs(); + setShardIndex(topDocs.topDocs, result.getShardIndex()); + topDocsList.add(topDocs.topDocs); + } + } if (aggsList != null) { // Add an estimate of the final reduce size - breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize)); + breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes)); aggs = aggregate(buffer.iterator(), new Iterator<>() { @Override public boolean hasNext() { @@ -275,7 +279,13 @@ public DelayableWriteable next() { ); buffer = null; } finally { - releaseAggs(buffer); + // Buffer is non-null on exception + if (buffer != null) { + releaseAggs(buffer); + if (aggsList != null) { + Releasables.close(aggsList); + } + } } if (hasAggs // reduced aggregations can be null if all shards failed @@ -335,7 +345,7 @@ private MergeResult partialReduce( List toConsume, List processedShards, TopDocsStats topDocsStats, - MergeResult lastMerge, + @Nullable MergeResult lastMerge, int numReducePhases ) { // ensure consistent ordering @@ -392,7 +402,7 @@ private MergeResult partialReduce( return new MergeResult( processedShards, newTopDocs, - newAggs == null ? null : DelayableWriteable.referencing(newAggs), + newAggs != null ? DelayableWriteable.referencing(newAggs) : null, newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0 ); } @@ -457,12 +467,13 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) { * Returns an estimation of the size that a reduce of the provided size * would take on memory. * This size is estimated as roughly 1.5 times the size of the serialized - * aggregations that need to be reduced. This estimation can be completely - * off for some aggregations but it is corrected with the real size after - * the reduce completes. + * aggregations that need to be reduced. + * This method expects an already accounted size, so only an extra 0.5x is returned. + * This estimation can be completely off for some aggregations + * but it is corrected with the real size after the reduce completes. */ private static long estimateRamBytesUsedForReduce(long size) { - return Math.round(1.5d * size - size); + return Math.round(0.5d * size); } private void consume(QuerySearchResult result, Runnable next) { @@ -639,7 +650,7 @@ public void onFailure(Exception exc) { }); } - private static void releaseAggs(List toConsume) { + private static void releaseAggs(@Nullable List toConsume) { if (toConsume != null) { for (QuerySearchResult result : toConsume) { result.releaseAggs(); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index a2b6be6ab0363..ba66c2e9ae898 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -75,7 +75,7 @@ public static DelayableWriteable referencing(Writeable. } } - private DelayableWriteable() {} + protected DelayableWriteable() {} /** * Returns a {@linkplain DelayableWriteable} that stores its contents diff --git a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java index e0b68647289b2..7c2d8fe8d0166 100644 --- a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java @@ -13,8 +13,14 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.io.stream.DelayableWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; @@ -25,7 +31,9 @@ import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -40,7 +48,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.mockito.Mockito.mock; public class QueryPhaseResultConsumerTests extends ESTestCase { @@ -148,6 +159,142 @@ public void testProgressListenerExceptionsAreCaught() throws Exception { } } + /** + * Adds batches with a high simulated size, expecting the CB to trip before deserialization. + */ + public void testBatchedEstimateSizeTooBig() throws Exception { + SearchRequest searchRequest = new SearchRequest("index"); + searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum"))); + + var aggCount = randomIntBetween(1, 10); + var circuitBreakerLimit = ByteSizeValue.ofMb(256); + var circuitBreaker = newLimitedBreaker(circuitBreakerLimit); + // More than what the CircuitBreaker should allow + long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 1.05 / aggCount); + + try ( + QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + circuitBreaker, + searchPhaseController, + () -> false, + new SearchProgressListener() { + }, + 10, + e -> {} + ) + ) { + for (int i = 0; i < aggCount; i++) { + // Add a dummy merge result with a high estimated size + var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> { + fail("This shouldn't be called"); + return null; + }), aggregationEstimatedSize); + queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult); + } + + try { + queryPhaseResultConsumer.reduce(); + fail("Expecting a circuit breaking exception to be thrown"); + } catch (CircuitBreakingException e) { + // The last merge result estimate should break + assertThat(e.getBytesWanted(), equalTo(aggregationEstimatedSize)); + } + } + } + + /** + * Adds batches with a high simulated size, expecting the CB to trip before deserialization. + *

+ * Similar to {@link #testBatchedEstimateSizeTooBig()}, but this tests the extra size + *

+ */ + public void testBatchedEstimateSizeTooBigAfterDeserialization() throws Exception { + SearchRequest searchRequest = new SearchRequest("index"); + searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum"))); + + var aggCount = randomIntBetween(1, 10); + var circuitBreakerLimit = ByteSizeValue.ofMb(256); + var circuitBreaker = newLimitedBreaker(circuitBreakerLimit); + // Less than the CB, but more after the 1.5x + long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 0.75 / aggCount); + long totalAggregationsEstimatedSize = aggregationEstimatedSize * aggCount; + + try ( + QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + circuitBreaker, + searchPhaseController, + () -> false, + new SearchProgressListener() { + }, + 10, + e -> {} + ) + ) { + for (int i = 0; i < aggCount; i++) { + // Add a dummy merge result with a high estimated size + var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> { + fail("This shouldn't be called"); + return null; + }), aggregationEstimatedSize); + queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult); + } + + try { + queryPhaseResultConsumer.reduce(); + fail("Expecting a circuit breaking exception to be thrown"); + } catch (CircuitBreakingException e) { + assertThat(circuitBreaker.getUsed(), greaterThanOrEqualTo(aggregationEstimatedSize)); + // A final +0.5x is added to account for the serialized->deserialized extra size + assertThat(e.getBytesWanted(), equalTo(Math.round(totalAggregationsEstimatedSize * 0.5))); + } + } + } + + /** + * DelayableWriteable that delegates expansion to a supplier. + */ + private static class DelegatingDelayableWriteable extends DelayableWriteable { + private final Supplier supplier; + + private DelegatingDelayableWriteable(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public void writeTo(StreamOutput out) { + throw new UnsupportedOperationException("Not to be called"); + } + + @Override + public T expand() { + return supplier.get(); + } + + @Override + public Serialized asSerialized(Reader reader, NamedWriteableRegistry registry) { + throw new UnsupportedOperationException("Not to be called"); + } + + @Override + public boolean isSerialized() { + return true; + } + + @Override + public long getSerializedSize() { + return 0; + } + + @Override + public void close() { + // noop + } + } + private static class ThrowingSearchProgressListener extends SearchProgressListener { private final AtomicInteger onQueryResult = new AtomicInteger(0); private final AtomicInteger onPartialReduce = new AtomicInteger(0); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 200ef370399e4..3c16b676b7e8e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -270,7 +270,7 @@ *

* A test cluster creates a set of nodes in the background before the test starts. The number of nodes in the cluster is * determined at random and can change across tests. The {@link ClusterScope} allows configuring the initial number of nodes - * that are created before the tests start. + * that are created before the tests start. More information about node configurations and settings in {@link InternalTestCluster}. *

  * {@literal @}NodeScope(scope=Scope.SUITE, numDataNodes=3)
  * public class SomeIT extends ESIntegTestCase {
@@ -2183,7 +2183,7 @@ public enum Scope {
 
         /**
          * Indicates whether the cluster can have dedicated master nodes. If {@code false} means data nodes will serve as master nodes
-         * and there will be no dedicated master (and data) nodes. Default is {@code false} which means
+         * and there will be no dedicated master (and data) nodes. Default is {@code true} which means
          * dedicated master nodes will be randomly used.
          */
         boolean supportsDedicatedMasters() default true;
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index 04c4894b6c201..7d44d7c9cec4d 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -176,6 +176,20 @@
  * to the async nature of Elasticsearch in combination with randomized testing. Once Threads and asynchronous calls
  * are involved reproducibility is very limited. This class should only be used through {@link ESIntegTestCase}.
  * 

+ *

{@link NodeConfigurationSource#nodeSettings} ordinals

+ * The supplied node ordinal for this node settings getter is generated as follows: + *
    + *
  1. + * The first indices correspond to the dedicated master nodes, enabled with the {@code randomlyAddDedicatedMasters} setting. + * Its amount can't currently be configured. + *
  2. + *
  3. + * Then, the data nodes, controlled by the @{code minNumDataNodes} and @{code maxNumDataNodes} settings. + *
  4. + *
  5. + * Finally, the coordinating-only, configured with the {@code numClientNodes} setting. + *
  6. + *
*/ public final class InternalTestCluster extends TestCluster {