-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Aggs: Fix CB on reduction phase #133398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggs: Fix CB on reduction phase #133398
Changes from 2 commits
86f2d88
e798aa0
7b3d88d
87ccf54
6fc8fa8
51fe8fc
41b86a0
eb46c13
8270935
ba3748b
2140240
d2fed37
449203f
7c7d3eb
7402133
d344228
f9b8379
0a89991
5f8a5cf
e8fda32
e173654
3d0f864
c9101bb
ac0929b
77b152b
e6d7beb
09aff51
8e4fd41
a24eba1
1581fc4
f10f1a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -231,6 +231,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { | |||||||||||||||||||||||||||||||||||||||||
while ((batchedResult = batchedResults.poll()) != null) { | ||||||||||||||||||||||||||||||||||||||||||
topDocsStats.add(batchedResult.v1()); | ||||||||||||||||||||||||||||||||||||||||||
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList); | ||||||||||||||||||||||||||||||||||||||||||
addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
for (QuerySearchResult result : buffer) { | ||||||||||||||||||||||||||||||||||||||||||
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly()); | ||||||||||||||||||||||||||||||||||||||||||
|
@@ -392,7 +393,7 @@ private MergeResult partialReduce( | |||||||||||||||||||||||||||||||||||||||||
return new MergeResult( | ||||||||||||||||||||||||||||||||||||||||||
processedShards, | ||||||||||||||||||||||||||||||||||||||||||
newTopDocs, | ||||||||||||||||||||||||||||||||||||||||||
newAggs == null ? null : DelayableWriteable.referencing(newAggs), | ||||||||||||||||||||||||||||||||||||||||||
newAggs != null ? DelayableWriteable.referencing(newAggs) : null, | ||||||||||||||||||||||||||||||||||||||||||
ivancea marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0 | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
@@ -462,7 +463,7 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) { | |||||||||||||||||||||||||||||||||||||||||
* the reduce completes. | ||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||
private static long estimateRamBytesUsedForReduce(long size) { | ||||||||||||||||||||||||||||||||||||||||||
return Math.round(1.5d * size - size); | ||||||||||||||||||||||||||||||||||||||||||
return Math.round(1.5d * size); | ||||||||||||||||||||||||||||||||||||||||||
|
long breakerSize = circuitBreakerBytes; | |
final InternalAggregations aggs; | |
try { | |
if (aggsList != null) { | |
// Add an estimate of the final reduce size | |
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize)); | |
aggs = aggregate(buffer.iterator(), new Iterator<>() { | |
@Override | |
public boolean hasNext() { | |
return aggsList.isEmpty() == false; | |
} | |
@Override | |
public DelayableWriteable<InternalAggregations> next() { | |
return aggsList.pollFirst(); | |
} | |
}, | |
resultSize, | |
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction() | |
); |
(circuitBreakerBytes
is the total bytes added to the CB in the consumer; the batched results will be here)
It's interesting though that if we added the estimated MergeResult size before, we're adding it once again. I'll have to ensure the 1.5 thing wasn't actually "fixing" this problem here. It affects other cases however, so it makes things more complicated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I'll review this again, as maybe it was considering the original estimated size to be already accounted, so the 0.5 extra would be right.
I'll check the codepaths leading to this to see what's happening exactly, and maybe undo this or change the javadoc if required
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That MergeResult estimation was ignored and lost. I followed it since the deserialization, and it wasn't used anywhere else
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should add it to the breaker before consuming? or we only know the estimated size after consuming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That
consumePartialMergeResult()
simply adds the agg to the aggsList param, nothing else. They're deserialized later here:elasticsearch/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
Line 250 in 5a3c9e7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, then it does not matter 👍