Skip to content

Commit 2925777

Browse files
committed
Don't set doc count error to 0 when batched reduction occurred
1 parent b1e1a54 commit 2925777

File tree

4 files changed

+22
-21
lines changed

4 files changed

+22
-21
lines changed

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.cluster.metadata.IndexMetadata;
1515
import org.elasticsearch.common.settings.Settings;
16-
import org.elasticsearch.search.SearchService;
1716
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
1817
import org.elasticsearch.search.aggregations.BucketOrder;
1918
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
2019
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
2120
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
2221
import org.elasticsearch.test.ESIntegTestCase;
23-
import org.junit.After;
24-
import org.junit.Before;
2522

2623
import java.io.IOException;
2724
import java.util.ArrayList;
@@ -53,18 +50,6 @@ public static String randomExecutionHint() {
5350

5451
private static int numRoutingValues;
5552

56-
@Before
57-
public void disableBatchedExecution() {
58-
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
59-
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
60-
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
61-
}
62-
63-
@After
64-
public void resetSettings() {
65-
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
66-
}
67-
6853
@Override
6954
public void setupSuiteScopeCluster() throws Exception {
7055
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
220220
batchedResults = this.batchedResults;
221221
}
222222
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
223+
final boolean hasBatchedResults = batchedResults.isEmpty() == false;
223224
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
224225
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
225226
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
@@ -247,6 +248,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
247248
if (aggsList != null) {
248249
// Add an estimate of the final reduce size
249250
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
251+
AggregationReduceContext aggReduceContext = performFinalReduce
252+
? aggReduceContextBuilder.forFinalReduction()
253+
: aggReduceContextBuilder.forPartialReduction();
254+
aggReduceContext.setFinalReduceHasBatchedResult(hasBatchedResults);
250255
aggs = aggregate(buffer.iterator(), new Iterator<>() {
251256
@Override
252257
public boolean hasNext() {
@@ -257,10 +262,7 @@ public boolean hasNext() {
257262
public DelayableWriteable<InternalAggregations> next() {
258263
return aggsList.pollFirst();
259264
}
260-
},
261-
resultSize,
262-
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
263-
);
265+
}, resultSize, aggReduceContext);
264266
} else {
265267
aggs = null;
266268
}

server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public interface Builder {
4747
@Nullable
4848
private final AggregationBuilder builder;
4949
private final AggregatorFactories.Builder subBuilders;
50+
private boolean finalReduceHasBatchedResult;
5051

5152
private AggregationReduceContext(
5253
BigArrays bigArrays,
@@ -136,6 +137,14 @@ public final AggregationReduceContext forAgg(String name) {
136137

137138
protected abstract AggregationReduceContext forSubAgg(AggregationBuilder sub);
138139

140+
public boolean doesFinalReduceHaveBatchedResult() {
141+
return finalReduceHasBatchedResult;
142+
}
143+
144+
public void setFinalReduceHasBatchedResult(boolean finalReduceHasBatchedResult) {
145+
this.finalReduceHasBatchedResult = finalReduceHasBatchedResult;
146+
}
147+
139148
/**
140149
* A {@linkplain AggregationReduceContext} to perform a partial reduction.
141150
*/
@@ -234,7 +243,9 @@ public PipelineTree pipelineTreeRoot() {
234243

235244
@Override
236245
protected AggregationReduceContext forSubAgg(AggregationBuilder sub) {
237-
return new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot);
246+
ForFinal subContext = new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot);
247+
subContext.setFinalReduceHasBatchedResult(doesFinalReduceHaveBatchedResult());
248+
return subContext;
238249
}
239250
}
240251
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,10 @@ public InternalAggregation get() {
332332
}
333333
long docCountError = -1;
334334
if (sumDocCountError != -1) {
335-
docCountError = size == 1 ? 0 : sumDocCountError;
335+
// If we are reducing only one aggregation (size == 1), the doc count error should be 0.
336+
// However, the presence of a batched query result implies this is a final reduction and a partial reduction with size > 1
337+
// has already occurred on a data node. The doc count error should not be 0 in this case.
338+
docCountError = size == 1 && reduceContext.doesFinalReduceHaveBatchedResult() == false ? 0 : sumDocCountError;
336339
}
337340
return create(name, result, reduceContext.isFinalReduce() ? getOrder() : thisReduceOrder, docCountError, otherDocCount);
338341
}

0 commit comments

Comments
 (0)