Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130452.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130452
summary: "Aggs: Add cancellation checks to `FilterByFilter` aggregator"
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

@ESIntegTestCase.SuiteScopeTestCase
public class FiltersCancellationIT extends ESIntegTestCase {

private static final String INDEX = "idx";
private static final String SLEEP_FIELD = "sleep";
private static final String NUMERIC_FIELD = "value";

private static final int NUM_DOCS = 100_000;
/**
* The number of milliseconds to sleep in the script that simulates a long-running operation.
* <p>
* As CancellableBulkScorer does a minimum of 4096 docs per batch, this number must be low to avoid long test times.
* </p>
*/
private static final long SLEEP_SCRIPT_MS = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use a pair of Semaphors like so:

private static final Semaphore scriptRunPermits = new Semaphore(0);
private static final Semaphore cancelRunPermits = new Semaphore(0);
...

In the test:
cancelRunPermits.acquire();
client().cancelTheRequest();
scriptRunPermits.release(Integer.MAX_VALUE);
...

In the script:
cancelRunPermits.release(1);
scriptRunPermits.acquire();

I'm sure there's a simpler way to do it. But this'd block the cancel task until the the script starts. Then block the script until the cancel has finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! It looks far better, faster and consistent with a Repeat(1000)


@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
}

protected Class<? extends Plugin> pausableFieldPluginClass() {
return SleepScriptPlugin.class;
}

@Override
public void setupSuiteScopeCluster() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject(SLEEP_FIELD);
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", SleepScriptPlugin.PAUSE_SCRIPT_LANG).endObject();
}
mapping.endObject();
mapping.startObject(NUMERIC_FIELD);
{
mapping.field("type", "long");
}
mapping.endObject();
}
mapping.endObject();
client().admin().indices().prepareCreate(INDEX).setMapping(mapping.endObject()).get();

BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < NUM_DOCS; i++) {
bulk.add(prepareIndex(INDEX).setId(Integer.toString(i)).setSource(NUMERIC_FIELD, i));
}
bulk.get();
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
}

public void testFiltersCountCancellation() throws Exception {
ensureProperCancellation(
client().prepareSearch(INDEX)
.addAggregation(
filters(
"filters",
new KeyedFilter[] {
new KeyedFilter("filter1", termQuery(SLEEP_FIELD, 1)),
new KeyedFilter("filter2", termQuery(SLEEP_FIELD, 2)) }
)
)
);
}

public void testFiltersSubAggsCancellation() throws Exception {
ensureProperCancellation(
client().prepareSearch(INDEX)
.addAggregation(
filters(
"filters",
new KeyedFilter[] {
new KeyedFilter("filter1", termQuery(SLEEP_FIELD, 1)),
new KeyedFilter("filter2", termQuery(SLEEP_FIELD, 2)) }
).subAggregation(terms("sub").field(SLEEP_FIELD))
)
);
}

private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
assertFalse(searchRequestFuture.isCancelled());
assertFalse(searchRequestFuture.isDone());

// Check that there are search tasks running
assertThat(getSearchTasks(), not(empty()));

// Wait to ensure scripts started executing and that we don't cancel too early
safeSleep(2000);

// CancellableBulkScorer does a starting batch of 4096 items, x2 after each iteration.
// That times SLEEP_SCRIPT_MS gives us the maximum time to wait (x2 to avoid flakiness)
long maxWaitMs = 3 * (4096 * SLEEP_SCRIPT_MS);

// Cancel the tasks
client().admin()
.cluster()
.prepareCancelTasks()
.setActions(TransportSearchAction.NAME + "*")
.waitForCompletion(true)
.setTimeout(TimeValue.timeValueMillis(maxWaitMs))
.get();

// Ensure the search request finished and that there are no more search tasks
assertBusy(() -> {
assertThat(getSearchTasks(), empty());
assertTrue(searchRequestFuture.isDone());
}, maxWaitMs, TimeUnit.MILLISECONDS);
}

private List<TaskInfo> getSearchTasks() {
return client().admin()
.cluster()
.prepareListTasks()
.setActions(TransportSearchAction.NAME + "*")
.setDetailed(true)
.get()
.getTasks();
}

public static class SleepScriptPlugin extends Plugin implements ScriptPlugin {
public static final String PAUSE_SCRIPT_LANG = "pause";

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return PAUSE_SCRIPT_LANG;
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
if (context == LongFieldScript.CONTEXT) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
try {
Thread.sleep(SLEEP_SCRIPT_MS);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
emit(1);
}
};
}
};
}
throw new IllegalStateException("unsupported type " + context);
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.runtime.AbstractScriptFieldQuery;
import org.elasticsearch.tasks.TaskCancelledException;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -268,7 +269,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
private void collectCount(LeafReaderContext ctx, Bits live) throws IOException {
Counter counter = new Counter(docCountProvider);
for (int filterOrd = 0; filterOrd < filters().size(); filterOrd++) {
incrementBucketDocCount(filterOrd, filters().get(filterOrd).count(ctx, counter, live));
incrementBucketDocCount(filterOrd, filters().get(filterOrd).count(ctx, counter, live, this::checkCancelled));
}
}

Expand Down Expand Up @@ -306,11 +307,17 @@ public void setScorer(Scorable scorer) {}
MatchCollector collector = new MatchCollector();
// create the buckets so we can call collectExistingBucket
grow(filters().size() + 1);
filters().get(0).collect(aggCtx.getLeafReaderContext(), collector, live);
filters().get(0).collect(aggCtx.getLeafReaderContext(), collector, live, this::checkCancelled);
for (int filterOrd = 1; filterOrd < filters().size(); filterOrd++) {
collector.subCollector = collectableSubAggregators.getLeafCollector(aggCtx);
collector.filterOrd = filterOrd;
filters().get(filterOrd).collect(aggCtx.getLeafReaderContext(), collector, live);
filters().get(filterOrd).collect(aggCtx.getLeafReaderContext(), collector, live, this::checkCancelled);
}
}

private void checkCancelled() {
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.internal.CancellableBulkScorer;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -192,7 +193,7 @@ Scorer randomAccessScorer(LeafReaderContext ctx) throws IOException {
/**
* Count the number of documents that match this filter in a leaf.
*/
long count(LeafReaderContext ctx, FiltersAggregator.Counter counter, Bits live) throws IOException {
long count(LeafReaderContext ctx, FiltersAggregator.Counter counter, Bits live, Runnable checkCancelled) throws IOException {
/*
* weight().count will return the count of matches for ctx if it can do
* so in constant time, otherwise -1. The Weight is responsible for
Expand All @@ -216,20 +217,22 @@ long count(LeafReaderContext ctx, FiltersAggregator.Counter counter, Bits live)
// No hits in this segment.
return 0;
}
scorer.score(counter, live, 0, DocIdSetIterator.NO_MORE_DOCS);
CancellableBulkScorer cancellableScorer = new CancellableBulkScorer(scorer, checkCancelled);
cancellableScorer.score(counter, live, 0, DocIdSetIterator.NO_MORE_DOCS);
return counter.readAndReset(ctx);
}

/**
* Collect all documents that match this filter in this leaf.
*/
void collect(LeafReaderContext ctx, LeafCollector collector, Bits live) throws IOException {
void collect(LeafReaderContext ctx, LeafCollector collector, Bits live, Runnable checkCancelled) throws IOException {
BulkScorer scorer = weight().bulkScorer(ctx);
if (scorer == null) {
// No hits in this segment.
return;
}
scorer.score(collector, live, 0, DocIdSetIterator.NO_MORE_DOCS);
CancellableBulkScorer cancellableScorer = new CancellableBulkScorer(scorer, checkCancelled);
cancellableScorer.score(collector, live, 0, DocIdSetIterator.NO_MORE_DOCS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* A {@link BulkScorer} wrapper that runs a {@link Runnable} on a regular basis
* so that the query can be interrupted.
*/
final class CancellableBulkScorer extends BulkScorer {
public final class CancellableBulkScorer extends BulkScorer {

// we use the BooleanScorer window size as a base interval in order to make sure that we do not
// slow down boolean queries
Expand All @@ -32,7 +32,7 @@ final class CancellableBulkScorer extends BulkScorer {
private final BulkScorer scorer;
private final Runnable checkCancelled;

CancellableBulkScorer(BulkScorer scorer, Runnable checkCancelled) {
public CancellableBulkScorer(BulkScorer scorer, Runnable checkCancelled) {
this.scorer = Objects.requireNonNull(scorer);
this.checkCancelled = Objects.requireNonNull(checkCancelled);
}
Expand Down
Loading