Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b78f195
can it really be this simple?
not-napoleon Jan 27, 2025
5751f45
Update docs/changelog/120944.yaml
not-napoleon Jan 27, 2025
ca451b8
More correct exception type
not-napoleon Jan 27, 2025
18870c4
[CI] Auto commit changes from spotless
Jan 27, 2025
250bc9b
clean up some deprecations
not-napoleon Jan 27, 2025
696fdfc
Merge remote-tracking branch 'refs/remotes/not-napoleon/aggregations-…
not-napoleon Jan 27, 2025
32f079c
[CI] Auto commit changes from spotless
Jan 27, 2025
a4cabaf
Aggs: test super deep
nik9000 Jan 24, 2025
ec66b50
add cancellation case to aggs tests
not-napoleon Jan 28, 2025
8181260
Merge remote-tracking branch 'refs/remotes/not-napoleon/aggregations-…
not-napoleon Jan 28, 2025
bdc6516
some testing around reduce time cancellation
not-napoleon Jan 29, 2025
7eb1966
[CI] Auto commit changes from spotless
Jan 29, 2025
7e213b6
remove the nocommits
not-napoleon Jan 29, 2025
6454286
Merge remote-tracking branch 'refs/remotes/not-napoleon/aggregations-…
not-napoleon Jan 29, 2025
a0485b5
Merge branch 'main' into aggregations-cancellation-after-collection
not-napoleon Jan 29, 2025
c8024e7
Fix license
nik9000 Jan 29, 2025
0231c5a
Merge remote-tracking branch 'refs/remotes/not-napoleon/aggregations-…
nik9000 Jan 29, 2025
7d811a0
Update docs/changelog/120944.yaml
not-napoleon Jan 29, 2025
4a45481
oops, something was using that one
not-napoleon Jan 29, 2025
f2b240f
allow tests to opt out of reduce cancellation testing
not-napoleon Jan 29, 2025
6f51925
Merge branch 'main' into aggregations-cancellation-after-collection
not-napoleon Jan 31, 2025
8359e4e
Merge branch 'main' into aggregations-cancellation-after-collection
nik9000 Feb 3, 2025
2869310
Explain
nik9000 Feb 3, 2025
dc526a8
Merge remote-tracking branch 'refs/remotes/not-napoleon/aggregations-…
nik9000 Feb 3, 2025
e11ac1e
Merge branch 'main' into aggregations-cancellation-after-collection
not-napoleon Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120944.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120944
summary: Aggregations cancellation after collection
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.copyOf(plugins);
}

// NOCOMMIT - make sure this test is still verifying what it was intended to verify
public void testCancellationDuringTimeSeriesAggregation() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
int numberOfShards = between(2, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.tasks.TaskCancelledException;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -573,7 +574,11 @@ private void rebucket() {
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
bucketOrds = new LongKeyedBucketOrds.FromMany(bigArrays());
success = true;
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {
long maxOwning = oldOrds.maxOwningBucketOrd();
for (long owningBucketOrd = 0; owningBucketOrd <= maxOwning; owningBucketOrd++) {
if (context.isCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It is not clear to me why we are handling auto-date histogram in a special way. Could we add a comment explaining the why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Its just that it spends a while spinning around doing extra stuff before it's ready for the normal build.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I found it from an actual bug report of folks that were trying to terminate aggs and the thread was stuck here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have not one but three typos in that sentence :D

throw new TaskCancelledException("cancelled");
}
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd);
Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)];
while (ordsEnum.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void testCancellationDuringFetchPhase() throws Exception {
ensureSearchWasCancelled(searchResponse);
}

// NOCOMMIT - make sure this test is still verifying what it was intended to verify
public void testCancellationDuringAggregation() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
// This test is only meaningful with at least 2 shards to trigger reduce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskCancelledException;

import java.io.IOException;
import java.util.AbstractList;
Expand Down Expand Up @@ -163,6 +164,10 @@ protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {}
* array of ordinals
*/
protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(LongArray bucketOrdsToCollect) throws IOException {
if (context.isCancelled()) {
throw new TaskCancelledException("not building sub-aggregations due to task cancellation");
}

prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation;
Expand All @@ -149,6 +151,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -251,29 +254,12 @@ protected List<SearchPlugin> getSearchPlugins() {
return List.of();
}

/**
* Deprecated - this will be made private in a future update
*/
@Deprecated
protected <A extends Aggregator> A createAggregator(
AggregationBuilder aggregationBuilder,
IndexReader indexReader,
MappedFieldType... fieldTypes
) throws IOException {
return createAggregator(aggregationBuilder, createAggregationContext(indexReader, new MatchAllDocsQuery(), fieldTypes));
}

protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder, AggregationContext context)
throws IOException {
return createAggregator(new AggregatorFactories.Builder().addAggregator(aggregationBuilder), context);
}

/**
* Deprecated - this will be made private in a future update
*/
@Deprecated
protected <A extends Aggregator> A createAggregator(AggregatorFactories.Builder builder, AggregationContext context)
throws IOException {
private <A extends Aggregator> A createAggregator(AggregatorFactories.Builder builder, AggregationContext context) throws IOException {
Aggregator[] aggregators = builder.build(context, null).createTopLevelAggregators();
assertThat(aggregators.length, equalTo(1));
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -310,11 +296,8 @@ protected AggregationContext createAggregationContext(IndexReader indexReader, Q
* While {@linkplain AggregationContext} is {@link Releasable} the caller is
* not responsible for releasing it. Instead, it is released automatically in
* in {@link #cleanupReleasables()}.
*
* Deprecated - this will be made private in a future update
*/
@Deprecated
protected AggregationContext createAggregationContext(
private AggregationContext createAggregationContext(
IndexReader indexReader,
IndexSettings indexSettings,
Query query,
Expand Down Expand Up @@ -346,6 +329,56 @@ private AggregationContext createAggregationContext(
int maxBucket,
boolean isInSortOrderExecutionRequired,
MappedFieldType... fieldTypes
) {
return createAggregationContext(
searcher,
indexSettings,
query,
breakerService,
bytesToPreallocate,
maxBucket,
isInSortOrderExecutionRequired,
() -> false,
fieldTypes
);
}

/**
* Creates an aggregation context that will randomly report that the query has been cancelled
*/
private AggregationContext createCancellingAggregationContext(
IndexSearcher searcher,
IndexSettings indexSettings,
Query query,
CircuitBreakerService breakerService,
long bytesToPreallocate,
int maxBucket,
boolean isInSortOrderExecutionRequired,
MappedFieldType... fieldTypes
) {
return createAggregationContext(
searcher,
indexSettings,
query,
breakerService,
bytesToPreallocate,
maxBucket,
isInSortOrderExecutionRequired,
() -> ESTestCase.random().nextInt(20) == 0,
fieldTypes
);
}

private AggregationContext createAggregationContext(
IndexSearcher searcher,
IndexSettings indexSettings,
Query query,
CircuitBreakerService breakerService,
long bytesToPreallocate,
int maxBucket,
boolean isInSortOrderExecutionRequired,
Supplier<Boolean> isCancelled,
MappedFieldType... fieldTypes
) {
MappingLookup mappingLookup = MappingLookup.fromMappers(
Mapping.EMPTY,
Expand Down Expand Up @@ -409,7 +442,7 @@ public Iterable<MappedFieldType> dimensionFields() {
bitsetFilterCache,
randomInt(),
() -> 0L,
() -> false,
isCancelled,
q -> q,
true,
isInSortOrderExecutionRequired
Expand Down Expand Up @@ -536,9 +569,11 @@ protected <A extends InternalAggregation> A searchAndReduce(IndexReader reader,
IndexSettings indexSettings = createIndexSettings();
// First run it to find circuit breaker leaks on the aggregator
runWithCrankyCircuitBreaker(indexSettings, searcher, aggTestConfig);
// Second run it to the end
CircuitBreakerService breakerService = new NoneCircuitBreakerService();
return searchAndReduce(indexSettings, searcher, breakerService, aggTestConfig);
// Next, try with random cancellations, again looking for leaks
runWithCancellingConfig(indexSettings, searcher, breakerService, aggTestConfig);
// Finally, run it to the end
return searchAndReduce(indexSettings, searcher, breakerService, aggTestConfig, this::createAggregationContext);
}

/**
Expand All @@ -552,20 +587,51 @@ private void runWithCrankyCircuitBreaker(IndexSettings indexSettings, IndexSearc
CircuitBreakerService crankyService = new CrankyCircuitBreakerService();
for (int i = 0; i < 5; i++) {
try {
searchAndReduce(indexSettings, searcher, crankyService, aggTestConfig);
searchAndReduce(indexSettings, searcher, crankyService, aggTestConfig, this::createAggregationContext);
} catch (CircuitBreakingException e) {
// Circuit breaks from the cranky breaker are expected - it randomly fails, after all
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
}
}
}

private void runWithCancellingConfig(
IndexSettings indexSettings,
IndexSearcher searcher,
CircuitBreakerService breakerService,
AggTestConfig aggTestConfig
) throws IOException {
for (int i = 0; i < 5; i++) {
try {
searchAndReduce(indexSettings, searcher, breakerService, aggTestConfig, this::createCancellingAggregationContext);
} catch (TaskCancelledException e) {
// we don't want to expectThrows this because the randomizer might just never report cancellation,
// but it's also normal that it should throw here.
}
}
}

@FunctionalInterface
public interface AggregationcContextSupplier {
AggregationContext get(
IndexSearcher searcher,
IndexSettings indexSettings,
Query query,
CircuitBreakerService breakerService,
long bytesToPreallocate,
int maxBucket,
boolean isInSortOrderExecutionRequired,
MappedFieldType... fieldTypes
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a ton of stuff to pass in. I feel like some of it, like CircuitBreakerService should be part of the implementation. Maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine actually - it lines up with how everything else is working now.

}

@SuppressWarnings("unchecked")
private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
IndexSettings indexSettings,
IndexSearcher searcher,
CircuitBreakerService breakerService,
AggTestConfig aggTestConfig
AggTestConfig aggTestConfig,
AggregationcContextSupplier contextSupplier
) throws IOException {
Query query = aggTestConfig.query();
AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(aggTestConfig.builder());
Expand All @@ -591,7 +657,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
subSearchers[searcherIDX] = new ShardSearcher(leave, compCTX);
}
for (ShardSearcher subSearcher : subSearchers) {
AggregationContext context = createAggregationContext(
AggregationContext context = contextSupplier.get(
subSearcher,
indexSettings,
query,
Expand Down Expand Up @@ -620,7 +686,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
}
}
} else {
AggregationContext context = createAggregationContext(
AggregationContext context = contextSupplier.get(
searcher,
indexSettings,
query,
Expand Down Expand Up @@ -688,16 +754,47 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
assertRoundTrip(internalAggregation.copyResults());
}
}
/* Verify that cancellation during final reduce correctly throws.
* We check reduce time cancellation only when consuming buckets.
*/
try {
// I can't remember if we mutate the InternalAggregations list, so make a defensive copy
List<InternalAggregations> internalAggsCopy = new ArrayList<>(internalAggs);
A internalAgg = doFinalReduce(maxBucket, bigArraysForReduction, builder, internalAggsCopy, true);
if (internalAgg instanceof MultiBucketsAggregation mb) {
// Empty mutli-bucket aggs are expected to return before even getting to the cancellation check
assertEquals("Got non-empty result for a cancelled reduction", 0, mb.getBuckets().size());
} // other cases?
} catch (TaskCancelledException e) {
/* We may not always honor cancellation in reduce, for example if we are returning no results, so we can't
* just expectThrows here.
*/
}

// now do the final reduce
A internalAgg = doFinalReduce(maxBucket, bigArraysForReduction, builder, internalAggs, false);
assertRoundTrip(internalAgg);
if (aggTestConfig.builder instanceof ValuesSourceAggregationBuilder.MetricsAggregationBuilder<?>) {
verifyMetricNames((ValuesSourceAggregationBuilder.MetricsAggregationBuilder<?>) aggTestConfig.builder, internalAgg);
}
return internalAgg;
}

private <A extends InternalAggregation> A doFinalReduce(
int maxBucket,
BigArrays bigArraysForReduction,
Builder builder,
List<InternalAggregations> internalAggs,
boolean cancelled
) throws IOException {
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(
maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal(
bigArraysForReduction,
getMockScriptService(),
() -> false,
() -> cancelled,
builder,
reduceBucketConsumer
);
Expand All @@ -707,10 +804,6 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
assertRoundTrip(internalAgg);

doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer);
assertRoundTrip(internalAgg);
if (aggTestConfig.builder instanceof ValuesSourceAggregationBuilder.MetricsAggregationBuilder<?>) {
verifyMetricNames((ValuesSourceAggregationBuilder.MetricsAggregationBuilder<?>) aggTestConfig.builder, internalAgg);
}
return internalAgg;
}

Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugin/analytics/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-java-rest-test'

esplugin {
name = 'x-pack-analytics'
Expand All @@ -18,6 +19,10 @@ base {
archivesName = 'x-pack-analytics'
}

tasks.named('javaRestTest') {
usesDefaultDistribution()
}

dependencies {
api 'org.apache.commons:commons-math3:3.6.1'
compileOnly project(path: xpackModule('core'))
Expand Down
Loading