Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133398.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133398
summary: "Aggs: Fix CB on reduction phase"
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.aggregations.bucket;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.aggregations.AggregationIntegTestCase;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING;
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING;
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING;
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING;
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.elasticsearch.search.aggregations.AggregationBuilders.composite;
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;

@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
public class AggregationReductionCircuitBreakingIT extends AggregationIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
// Most of the settings here exist to make the search as stable and deterministic as possible
var settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
// More threads may lead to more consumption and the test failing in the datanodes
.put("thread_pool.search.size", 1);
if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) {
// Coordinator
settings.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "7MB");
} else {
// Datanode
// To avoid OOMs
settings.put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true).put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
}
return settings.build();
}

/**
* Expect the breaker to trip in `QueryPhaseResultConsume#reduce()`, when reducing `MergeResult`s.
* <p>
* After testing this, the agg serialized size is around 5MB.
* The CB is set to 7MB, as the reduction is expected to add an extra 50% overhead.
* </p>
*/
public void testCBTrippingOnReduction() throws IOException {
createIndex();
addDocs(100, 100, 100);

// Some leaks (Check ESTestCase#loggedLeaks) aren't logged unless we run the test twice.
// So we run it multiple times to ensure everything gets collected before the final test checks.
for (int i = 0; i < 10; i++) {
assertCBTrip(
() -> internalCluster().coordOnlyNodeClient()
.prepareSearch("index")
.setSize(0)
.addAggregation(
composite(
"composite",
List.of(
new TermsValuesSourceBuilder("integer").field("integer"),
new TermsValuesSourceBuilder("long").field("long")
)
).size(5000).subAggregation(topHits("top_hits").size(10))
)
.setBatchedReduceSize(randomIntBetween(2, 5)),
e -> {
var completeException = ExceptionsHelper.stackTrace(e);
// If a shard fails, we can't check reduction
assumeTrue(completeException, e.shardFailures().length == 0);
assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
}
);
}
}

public void assertCBTrip(Supplier<SearchRequestBuilder> requestSupplier, Consumer<SearchPhaseExecutionException> exceptionCallback) {
try {
requestSupplier.get().get().decRef();

fail("Expected the breaker to trip");
} catch (SearchPhaseExecutionException e) {
exceptionCallback.accept(e);
}
}

private void createIndex() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
mappingBuilder.startObject();
mappingBuilder.startObject("properties");
{
mappingBuilder.startObject("integer");
mappingBuilder.field("type", "integer");
mappingBuilder.endObject();
}
{
mappingBuilder.startObject("long");
mappingBuilder.field("type", "long");
mappingBuilder.endObject();
}

mappingBuilder.endObject(); // properties
mappingBuilder.endObject();

assertAcked(
prepareCreate("index").setSettings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10)).build())
.setMapping(mappingBuilder)
);
}

private void addDocs(int docCount, int integerFieldMvCount, int longFieldMvCount) throws IOException {
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
XContentBuilder docSource = XContentFactory.jsonBuilder();
docSource.startObject();
final int docNumber = i;
List<Integer> integerValues = IntStream.range(0, integerFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
List<Long> longValues = LongStream.range(0, longFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
docSource.field("integer", integerValues);
docSource.field("long", longValues);
docSource.endObject();

docs.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource));
}
indexRandom(true, false, false, false, docs);
forceMerge(false);
flushAndRefresh("index");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,31 +222,35 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
// execution for shards on the coordinating node itself
if (mergeResult != null) {
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
}
Tuple<TopDocsStats, MergeResult> batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
}
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
if (topDocsList != null) {
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
setShardIndex(topDocs.topDocs, result.getShardIndex());
topDocsList.add(topDocs.topDocs);
}
}

SearchPhaseController.ReducedQueryPhase reducePhase;
long breakerSize = circuitBreakerBytes;
final InternalAggregations aggs;
try {
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
// execution for shards on the coordinating node itself
if (mergeResult != null) {
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
breakerSize = addEstimateAndMaybeBreak(mergeResult.estimatedSize);
}
Tuple<TopDocsStats, MergeResult> batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
// Add the estimate of the agg size
breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
}
for (QuerySearchResult result : buffer) {
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
if (topDocsList != null) {
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
setShardIndex(topDocs.topDocs, result.getShardIndex());
topDocsList.add(topDocs.topDocs);
}
}
if (aggsList != null) {
// Add an estimate of the final reduce size
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes));
aggs = aggregate(buffer.iterator(), new Iterator<>() {
@Override
public boolean hasNext() {
Expand Down Expand Up @@ -275,7 +279,13 @@ public DelayableWriteable<InternalAggregations> next() {
);
buffer = null;
} finally {
releaseAggs(buffer);
// Buffer is non-null on exception
if (buffer != null) {
releaseAggs(buffer);
if (aggsList != null) {
Releasables.close(aggsList);
}
}
}
if (hasAggs
// reduced aggregations can be null if all shards failed
Expand Down Expand Up @@ -335,7 +345,7 @@ private MergeResult partialReduce(
List<QuerySearchResult> toConsume,
List<SearchShard> processedShards,
TopDocsStats topDocsStats,
MergeResult lastMerge,
@Nullable MergeResult lastMerge,
int numReducePhases
) {
// ensure consistent ordering
Expand Down Expand Up @@ -392,7 +402,7 @@ private MergeResult partialReduce(
return new MergeResult(
processedShards,
newTopDocs,
newAggs == null ? null : DelayableWriteable.referencing(newAggs),
newAggs != null ? DelayableWriteable.referencing(newAggs) : null,
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0
);
}
Expand Down Expand Up @@ -457,12 +467,13 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) {
* Returns an estimation of the size that a reduce of the provided size
* would take on memory.
* This size is estimated as roughly 1.5 times the size of the serialized
* aggregations that need to be reduced. This estimation can be completely
* off for some aggregations but it is corrected with the real size after
* the reduce completes.
* aggregations that need to be reduced.
* This method expects an already accounted size, so only an extra 0.5x is returned.
* This estimation can be completely off for some aggregations
* but it is corrected with the real size after the reduce completes.
*/
private static long estimateRamBytesUsedForReduce(long size) {
return Math.round(1.5d * size - size);
return Math.round(0.5d * size);
}

private void consume(QuerySearchResult result, Runnable next) {
Expand Down Expand Up @@ -639,7 +650,7 @@ public void onFailure(Exception exc) {
});
}

private static void releaseAggs(List<QuerySearchResult> toConsume) {
private static void releaseAggs(@Nullable List<QuerySearchResult> toConsume) {
if (toConsume != null) {
for (QuerySearchResult result : toConsume) {
result.releaseAggs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(Writeable.
}
}

private DelayableWriteable() {}
protected DelayableWriteable() {}

/**
* Returns a {@linkplain DelayableWriteable} that stores its contents
Expand Down
Loading
Loading