Skip to content

Commit cf36d97

Browse files
not-napoleonelasticsearchmachine
andauthored
Aggregations cancellation after collection (#120944) (#121936)
This PR addresses issues around aggregations cancellation, mentioned in #108701 and other places. In brief, during aggregations collection time, we respect cancellation via the mechanisms in the searcher to poison cancelled queries. But once the aggregation finishes collection, there is no further need to interact with the searcher, so we cannot rely on that for cancellation checking. In particular, deeply nested aggregations can spend a long time constructing the results tree. Checking for cancellation is a trade off, as the check itself is somewhat expensive (it involves a volatile read), so we want to balance checking often enough that cancelled queries aren't taking up resources for a long time, but not so frequently that it slows down most aggregation queries. Our first attempt to this is to check once when we go to build sub-aggregations, as the worst cases for this that we've seen involve needing to build deep sub-aggregation trees. Checking at sub-aggregation construction time also provides a conveniently centralized method call to add the check to. --------- Conflicts: server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java Co-authored-by: elasticsearchmachine <[email protected]>
1 parent bb77d49 commit cf36d97

File tree

7 files changed

+520
-73
lines changed

7 files changed

+520
-73
lines changed

docs/changelog/120944.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120944
2+
summary: Aggregations cancellation after collection
3+
area: Aggregations
4+
type: bug
5+
issues:
6+
- 108701

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.search.aggregations.support.AggregationContext;
3636
import org.elasticsearch.search.aggregations.support.ValuesSource;
3737
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
38+
import org.elasticsearch.tasks.TaskCancelledException;
3839

3940
import java.io.IOException;
4041
import java.util.Collections;
@@ -573,7 +574,15 @@ private void rebucket() {
573574
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
574575
bucketOrds = new LongKeyedBucketOrds.FromMany(bigArrays());
575576
success = true;
576-
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {
577+
long maxOwning = oldOrds.maxOwningBucketOrd();
578+
for (long owningBucketOrd = 0; owningBucketOrd <= maxOwning; owningBucketOrd++) {
579+
/*
580+
* Check for cancelation during this tight loop as it can take a while and the standard
581+
* cancelation checks don't run during the loop. Becuase it's a tight loop.
582+
*/
583+
if (context.isCancelled()) {
584+
throw new TaskCancelledException("cancelled");
585+
}
577586
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd);
578587
Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)];
579588
while (ordsEnum.next()) {

modules/aggregations/src/test/java/org/elasticsearch/aggregations/metric/MatrixStatsAggregatorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void testNoData() throws Exception {
3636
MatrixStatsAggregationBuilder aggBuilder = new MatrixStatsAggregationBuilder("my_agg").fields(
3737
Collections.singletonList("field")
3838
);
39-
InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft));
39+
InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft).noReductionCancellation());
4040
assertNull(stats.getStats());
4141
assertEquals(0L, stats.getDocCount());
4242
}
@@ -54,7 +54,7 @@ public void testUnmapped() throws Exception {
5454
MatrixStatsAggregationBuilder aggBuilder = new MatrixStatsAggregationBuilder("my_agg").fields(
5555
Collections.singletonList("bogus")
5656
);
57-
InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft));
57+
InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft).noReductionCancellation());
5858
assertNull(stats.getStats());
5959
assertEquals(0L, stats.getDocCount());
6060
}
@@ -88,7 +88,7 @@ public void testTwoFields() throws Exception {
8888
MatrixStatsAggregationBuilder aggBuilder = new MatrixStatsAggregationBuilder("my_agg").fields(
8989
Arrays.asList(fieldA, fieldB)
9090
);
91-
InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ftA, ftB));
91+
InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ftA, ftB).noReductionCancellation());
9292
multiPassStats.assertNearlyEqual(stats);
9393
assertTrue(MatrixAggregationInspectionHelper.hasValue(stats));
9494
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.search.aggregations.support.AggregationContext;
2626
import org.elasticsearch.search.aggregations.support.AggregationPath;
2727
import org.elasticsearch.search.sort.SortOrder;
28+
import org.elasticsearch.tasks.TaskCancelledException;
2829

2930
import java.io.IOException;
3031
import java.util.AbstractList;
@@ -176,6 +177,9 @@ protected void prepareSubAggs(long[] ordsToCollect) throws IOException {}
176177
* array of ordinals
177178
*/
178179
protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
180+
if (context.isCancelled()) {
181+
throw new TaskCancelledException("not building sub-aggregations due to task cancellation");
182+
}
179183
prepareSubAggs(bucketOrdsToCollect);
180184
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
181185
for (int i = 0; i < subAggregators.length; i++) {

0 commit comments

Comments
 (0)