Skip to content

Commit b40021a

Browse files
authored
Fix doc_count_error_upper_bound for already reduced results (#134645)
The batched query phase may reduce results on the data node. If so, the logic that determines a terms aggregation's doc_count_error_upper_bound should be aware.
1 parent f858d3e commit b40021a

File tree

4 files changed

+22
-21
lines changed

4 files changed

+22
-21
lines changed

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.cluster.metadata.IndexMetadata;
1515
import org.elasticsearch.common.settings.Settings;
16-
import org.elasticsearch.search.SearchService;
1716
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
1817
import org.elasticsearch.search.aggregations.BucketOrder;
1918
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
2019
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
2120
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
2221
import org.elasticsearch.test.ESIntegTestCase;
23-
import org.junit.After;
24-
import org.junit.Before;
2522

2623
import java.io.IOException;
2724
import java.util.ArrayList;
@@ -53,18 +50,6 @@ public static String randomExecutionHint() {
5350

5451
private static int numRoutingValues;
5552

56-
@Before
57-
public void disableBatchedExecution() {
58-
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
59-
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
60-
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
61-
}
62-
63-
@After
64-
public void resetSettings() {
65-
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
66-
}
67-
6853
@Override
6954
public void setupSuiteScopeCluster() throws Exception {
7055
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
221221
batchedResults = this.batchedResults;
222222
}
223223
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
224+
final boolean hasBatchedResults = batchedResults.isEmpty() == false;
224225
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
225226
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
226227

@@ -252,6 +253,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
252253
if (aggsList != null) {
253254
// Add an estimate of the final reduce size
254255
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes));
256+
AggregationReduceContext aggReduceContext = performFinalReduce
257+
? aggReduceContextBuilder.forFinalReduction()
258+
: aggReduceContextBuilder.forPartialReduction();
259+
aggReduceContext.setHasBatchedResult(hasBatchedResults);
255260
aggs = aggregate(buffer.iterator(), new Iterator<>() {
256261
@Override
257262
public boolean hasNext() {
@@ -262,10 +267,7 @@ public boolean hasNext() {
262267
public DelayableWriteable<InternalAggregations> next() {
263268
return aggsList.pollFirst();
264269
}
265-
},
266-
resultSize,
267-
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
268-
);
270+
}, resultSize, aggReduceContext);
269271
} else {
270272
aggs = null;
271273
}

server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public interface Builder {
4747
@Nullable
4848
private final AggregationBuilder builder;
4949
private final AggregatorFactories.Builder subBuilders;
50+
private boolean hasBatchedResult;
5051

5152
private AggregationReduceContext(
5253
BigArrays bigArrays,
@@ -136,6 +137,14 @@ public final AggregationReduceContext forAgg(String name) {
136137

137138
protected abstract AggregationReduceContext forSubAgg(AggregationBuilder sub);
138139

140+
public boolean hasBatchedResult() {
141+
return hasBatchedResult;
142+
}
143+
144+
public void setHasBatchedResult(boolean hasBatchedResult) {
145+
this.hasBatchedResult = hasBatchedResult;
146+
}
147+
139148
/**
140149
* A {@linkplain AggregationReduceContext} to perform a partial reduction.
141150
*/
@@ -234,7 +243,9 @@ public PipelineTree pipelineTreeRoot() {
234243

235244
@Override
236245
protected AggregationReduceContext forSubAgg(AggregationBuilder sub) {
237-
return new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot);
246+
ForFinal subContext = new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot);
247+
subContext.setHasBatchedResult(hasBatchedResult());
248+
return subContext;
238249
}
239250
}
240251
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,10 @@ public InternalAggregation get() {
332332
}
333333
long docCountError = -1;
334334
if (sumDocCountError != -1) {
335-
docCountError = size == 1 ? 0 : sumDocCountError;
335+
// If we are reducing only one aggregation (size == 1), the doc count error should be 0.
336+
// However, the presence of a batched query result implies a partial reduction with size > 1
337+
// has already occurred on a data node. The doc count error should not be 0 in this case.
338+
docCountError = size == 1 && reduceContext.hasBatchedResult() == false ? 0 : sumDocCountError;
336339
}
337340
return create(name, result, reduceContext.isFinalReduce() ? getOrder() : thisReduceOrder, docCountError, otherDocCount);
338341
}

0 commit comments

Comments
 (0)