Skip to content

Commit e823224

Browse files
authored
Aggs: Fix CB on reduction phase (#133398) (#134650)
Fix an issue with aggs reduction phase where `MergeResult` estimations were ignored on batched results, leading to no CB changes before serialized aggs expansion
1 parent 759be3f commit e823224

File tree

7 files changed

+363
-30
lines changed

7 files changed

+363
-30
lines changed

docs/changelog/133398.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133398
2+
summary: "Aggs: Fix CB on reduction phase"
3+
area: Aggregations
4+
type: bug
5+
issues: []
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.aggregations.bucket;
11+
12+
import org.elasticsearch.ExceptionsHelper;
13+
import org.elasticsearch.action.DocWriteRequest;
14+
import org.elasticsearch.action.index.IndexRequestBuilder;
15+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
16+
import org.elasticsearch.action.search.SearchRequestBuilder;
17+
import org.elasticsearch.aggregations.AggregationIntegTestCase;
18+
import org.elasticsearch.common.breaker.CircuitBreakingException;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
21+
import org.elasticsearch.test.ESIntegTestCase;
22+
import org.elasticsearch.xcontent.XContentBuilder;
23+
import org.elasticsearch.xcontent.XContentFactory;
24+
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.function.Consumer;
29+
import java.util.function.Supplier;
30+
import java.util.stream.IntStream;
31+
import java.util.stream.LongStream;
32+
33+
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING;
34+
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING;
35+
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING;
36+
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING;
37+
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
38+
import static org.elasticsearch.search.aggregations.AggregationBuilders.composite;
39+
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
40+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
41+
import static org.hamcrest.Matchers.containsString;
42+
import static org.hamcrest.Matchers.instanceOf;
43+
44+
@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
45+
public class AggregationReductionCircuitBreakingIT extends AggregationIntegTestCase {
46+
@Override
47+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
48+
// Most of the settings here exist to make the search as stable and deterministic as possible
49+
var settings = Settings.builder()
50+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
51+
.put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
52+
// More threads may lead to more consumption and the test failing in the datanodes
53+
.put("thread_pool.search.size", 1);
54+
if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) {
55+
// Coordinator
56+
settings.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "7MB");
57+
} else {
58+
// Datanode
59+
// To avoid OOMs
60+
settings.put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true).put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
61+
}
62+
return settings.build();
63+
}
64+
65+
/**
66+
* Expect the breaker to trip in `QueryPhaseResultConsume#reduce()`, when reducing `MergeResult`s.
67+
* <p>
68+
* After testing this, the agg serialized size is around 5MB.
69+
* The CB is set to 7MB, as the reduction is expected to add an extra 50% overhead.
70+
* </p>
71+
*/
72+
public void testCBTrippingOnReduction() throws IOException {
73+
createIndex();
74+
addDocs(100, 100, 100);
75+
76+
// Some leaks (Check ESTestCase#loggedLeaks) aren't logged unless we run the test twice.
77+
// So we run it multiple times to ensure everything gets collected before the final test checks.
78+
for (int i = 0; i < 10; i++) {
79+
assertCBTrip(
80+
() -> internalCluster().coordOnlyNodeClient()
81+
.prepareSearch("index")
82+
.setSize(0)
83+
.addAggregation(
84+
composite(
85+
"composite",
86+
List.of(
87+
new TermsValuesSourceBuilder("integer").field("integer"),
88+
new TermsValuesSourceBuilder("long").field("long")
89+
)
90+
).size(5000).subAggregation(topHits("top_hits").size(10))
91+
)
92+
.setBatchedReduceSize(randomIntBetween(2, 5)),
93+
e -> {
94+
var completeException = ExceptionsHelper.stackTrace(e);
95+
// If a shard fails, we can't check reduction
96+
assumeTrue(completeException, e.shardFailures().length == 0);
97+
assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
98+
assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
99+
}
100+
);
101+
}
102+
}
103+
104+
public void assertCBTrip(Supplier<SearchRequestBuilder> requestSupplier, Consumer<SearchPhaseExecutionException> exceptionCallback) {
105+
try {
106+
requestSupplier.get().get().decRef();
107+
108+
fail("Expected the breaker to trip");
109+
} catch (SearchPhaseExecutionException e) {
110+
exceptionCallback.accept(e);
111+
}
112+
}
113+
114+
private void createIndex() throws IOException {
115+
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
116+
mappingBuilder.startObject();
117+
mappingBuilder.startObject("properties");
118+
{
119+
mappingBuilder.startObject("integer");
120+
mappingBuilder.field("type", "integer");
121+
mappingBuilder.endObject();
122+
}
123+
{
124+
mappingBuilder.startObject("long");
125+
mappingBuilder.field("type", "long");
126+
mappingBuilder.endObject();
127+
}
128+
129+
mappingBuilder.endObject(); // properties
130+
mappingBuilder.endObject();
131+
132+
assertAcked(
133+
prepareCreate("index").setSettings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10)).build())
134+
.setMapping(mappingBuilder)
135+
);
136+
}
137+
138+
private void addDocs(int docCount, int integerFieldMvCount, int longFieldMvCount) throws IOException {
139+
List<IndexRequestBuilder> docs = new ArrayList<>();
140+
for (int i = 0; i < docCount; i++) {
141+
XContentBuilder docSource = XContentFactory.jsonBuilder();
142+
docSource.startObject();
143+
final int docNumber = i;
144+
List<Integer> integerValues = IntStream.range(0, integerFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
145+
List<Long> longValues = LongStream.range(0, longFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
146+
docSource.field("integer", integerValues);
147+
docSource.field("long", longValues);
148+
docSource.endObject();
149+
150+
docs.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource));
151+
}
152+
indexRandom(true, false, false, false, docs);
153+
forceMerge(false);
154+
flushAndRefresh("index");
155+
}
156+
}

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -222,31 +222,35 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
222222
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
223223
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
224224
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
225-
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
226-
// execution for shards on the coordinating node itself
227-
if (mergeResult != null) {
228-
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
229-
}
230-
Tuple<TopDocsStats, MergeResult> batchedResult;
231-
while ((batchedResult = batchedResults.poll()) != null) {
232-
topDocsStats.add(batchedResult.v1());
233-
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
234-
}
235-
for (QuerySearchResult result : buffer) {
236-
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
237-
if (topDocsList != null) {
238-
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
239-
setShardIndex(topDocs.topDocs, result.getShardIndex());
240-
topDocsList.add(topDocs.topDocs);
241-
}
242-
}
225+
243226
SearchPhaseController.ReducedQueryPhase reducePhase;
244227
long breakerSize = circuitBreakerBytes;
245228
final InternalAggregations aggs;
246229
try {
230+
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
231+
// execution for shards on the coordinating node itself
232+
if (mergeResult != null) {
233+
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
234+
breakerSize = addEstimateAndMaybeBreak(mergeResult.estimatedSize);
235+
}
236+
Tuple<TopDocsStats, MergeResult> batchedResult;
237+
while ((batchedResult = batchedResults.poll()) != null) {
238+
topDocsStats.add(batchedResult.v1());
239+
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
240+
// Add the estimate of the agg size
241+
breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
242+
}
243+
for (QuerySearchResult result : buffer) {
244+
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
245+
if (topDocsList != null) {
246+
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
247+
setShardIndex(topDocs.topDocs, result.getShardIndex());
248+
topDocsList.add(topDocs.topDocs);
249+
}
250+
}
247251
if (aggsList != null) {
248252
// Add an estimate of the final reduce size
249-
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
253+
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes));
250254
aggs = aggregate(buffer.iterator(), new Iterator<>() {
251255
@Override
252256
public boolean hasNext() {
@@ -275,7 +279,13 @@ public DelayableWriteable<InternalAggregations> next() {
275279
);
276280
buffer = null;
277281
} finally {
278-
releaseAggs(buffer);
282+
// Buffer is non-null on exception
283+
if (buffer != null) {
284+
releaseAggs(buffer);
285+
if (aggsList != null) {
286+
Releasables.close(aggsList);
287+
}
288+
}
279289
}
280290
if (hasAggs
281291
// reduced aggregations can be null if all shards failed
@@ -335,7 +345,7 @@ private MergeResult partialReduce(
335345
List<QuerySearchResult> toConsume,
336346
List<SearchShard> processedShards,
337347
TopDocsStats topDocsStats,
338-
MergeResult lastMerge,
348+
@Nullable MergeResult lastMerge,
339349
int numReducePhases
340350
) {
341351
// ensure consistent ordering
@@ -392,7 +402,7 @@ private MergeResult partialReduce(
392402
return new MergeResult(
393403
processedShards,
394404
newTopDocs,
395-
newAggs == null ? null : DelayableWriteable.referencing(newAggs),
405+
newAggs != null ? DelayableWriteable.referencing(newAggs) : null,
396406
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0
397407
);
398408
}
@@ -457,12 +467,13 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) {
457467
* Returns an estimation of the size that a reduce of the provided size
458468
* would take on memory.
459469
* This size is estimated as roughly 1.5 times the size of the serialized
460-
* aggregations that need to be reduced. This estimation can be completely
461-
* off for some aggregations but it is corrected with the real size after
462-
* the reduce completes.
470+
* aggregations that need to be reduced.
471+
* This method expects an already accounted size, so only an extra 0.5x is returned.
472+
* This estimation can be completely off for some aggregations
473+
* but it is corrected with the real size after the reduce completes.
463474
*/
464475
private static long estimateRamBytesUsedForReduce(long size) {
465-
return Math.round(1.5d * size - size);
476+
return Math.round(0.5d * size);
466477
}
467478

468479
private void consume(QuerySearchResult result, Runnable next) {
@@ -639,7 +650,7 @@ public void onFailure(Exception exc) {
639650
});
640651
}
641652

642-
private static void releaseAggs(List<QuerySearchResult> toConsume) {
653+
private static void releaseAggs(@Nullable List<QuerySearchResult> toConsume) {
643654
if (toConsume != null) {
644655
for (QuerySearchResult result : toConsume) {
645656
result.releaseAggs();

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(Writeable.
7575
}
7676
}
7777

78-
private DelayableWriteable() {}
78+
protected DelayableWriteable() {}
7979

8080
/**
8181
* Returns a {@linkplain DelayableWriteable} that stores its contents

0 commit comments

Comments
 (0)