Skip to content

Commit 210f854

Browse files
Relax some search interfaces to allow arbitrary cancellable tasks (#122188)
An easy change we can split out of #121885 to make that shorter.
1 parent 3810864 commit 210f854

File tree

9 files changed

+34
-32
lines changed

9 files changed

+34
-32
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.lucene.search.Query;
2222
import org.apache.lucene.search.TotalHits;
2323
import org.apache.lucene.util.NumericUtils;
24-
import org.elasticsearch.action.search.SearchShardTask;
2524
import org.elasticsearch.action.search.SearchType;
2625
import org.elasticsearch.cluster.routing.IndexRouting;
2726
import org.elasticsearch.common.lucene.search.Queries;
@@ -77,6 +76,7 @@
7776
import org.elasticsearch.search.slice.SliceBuilder;
7877
import org.elasticsearch.search.sort.SortAndFormats;
7978
import org.elasticsearch.search.suggest.SuggestionSearchContext;
79+
import org.elasticsearch.tasks.CancellableTask;
8080

8181
import java.io.IOException;
8282
import java.io.UncheckedIOException;
@@ -131,7 +131,7 @@ final class DefaultSearchContext extends SearchContext {
131131
private CollapseContext collapse;
132132
// filter for sliced scroll
133133
private SliceBuilder sliceBuilder;
134-
private SearchShardTask task;
134+
private CancellableTask task;
135135
private QueryPhaseRankShardContext queryPhaseRankShardContext;
136136

137137
/**
@@ -433,7 +433,7 @@ public void preProcess() {
433433
this.query = buildFilteredQuery(query);
434434
if (lowLevelCancellation) {
435435
searcher().addQueryCancellation(() -> {
436-
final SearchShardTask task = getTask();
436+
final CancellableTask task = getTask();
437437
if (task != null) {
438438
task.ensureNotCancelled();
439439
}
@@ -907,12 +907,12 @@ public void setProfilers(Profilers profilers) {
907907
}
908908

909909
@Override
910-
public void setTask(SearchShardTask task) {
910+
public void setTask(CancellableTask task) {
911911
this.task = task;
912912
}
913913

914914
@Override
915-
public SearchShardTask getTask() {
915+
public CancellableTask getTask() {
916916
return task;
917917
}
918918

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import org.elasticsearch.search.sort.SortBuilder;
127127
import org.elasticsearch.search.suggest.Suggest;
128128
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
129+
import org.elasticsearch.tasks.CancellableTask;
129130
import org.elasticsearch.tasks.TaskCancelledException;
130131
import org.elasticsearch.telemetry.tracing.Tracer;
131132
import org.elasticsearch.threadpool.Scheduler;
@@ -581,7 +582,7 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
581582
}
582583
}
583584

584-
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
585+
public void executeQueryPhase(ShardSearchRequest request, CancellableTask task, ActionListener<SearchPhaseResult> listener) {
585586
ActionListener<SearchPhaseResult> finalListener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
586587
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
587588
: "empty responses require more than one shard";
@@ -729,7 +730,7 @@ private static <T extends RefCounted> void runAsync(
729730
* It is the responsibility of the caller to ensure that the ref count is correctly decremented
730731
* when the object is no longer needed.
731732
*/
732-
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {
733+
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, CancellableTask task) throws Exception {
733734
final ReaderContext readerContext = createOrGetReaderContext(request);
734735
try (
735736
Releasable scope = tracer.withScope(task);
@@ -953,7 +954,7 @@ public void executeFetchPhase(
953954
}, wrapFailureListener(listener, readerContext, markAsUsed));
954955
}
955956

956-
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
957+
public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, ActionListener<FetchSearchResult> listener) {
957958
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
958959
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
959960
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
@@ -991,7 +992,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
991992
}));
992993
}
993994

994-
protected void checkCancelled(SearchShardTask task) {
995+
protected void checkCancelled(CancellableTask task) {
995996
// check cancellation as early as possible, as it avoids opening up a Lucene reader on FrozenEngine
996997
try {
997998
task.ensureNotCancelled();
@@ -1122,7 +1123,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen
11221123
protected SearchContext createContext(
11231124
ReaderContext readerContext,
11241125
ShardSearchRequest request,
1125-
SearchShardTask task,
1126+
CancellableTask task,
11261127
ResultsType resultsType,
11271128
boolean includeAggregations
11281129
) throws IOException {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.search.MatchNoDocsQuery;
1414
import org.elasticsearch.ElasticsearchStatusException;
1515
import org.elasticsearch.action.ActionListener;
16-
import org.elasticsearch.action.search.SearchShardTask;
1716
import org.elasticsearch.common.logging.DeprecationCategory;
1817
import org.elasticsearch.common.logging.DeprecationLogger;
1918
import org.elasticsearch.index.query.QueryBuilder;
@@ -38,6 +37,7 @@
3837
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
3938
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
4039
import org.elasticsearch.search.internal.ShardSearchRequest;
40+
import org.elasticsearch.tasks.CancellableTask;
4141
import org.elasticsearch.xcontent.ParseField;
4242

4343
import java.io.IOException;
@@ -128,7 +128,7 @@ private static SignificantTermsAggregatorSupplier bytesSupplier() {
128128
* <p>
129129
* Some searches that will never match can still fall through and we endup running query that will produce no results.
130130
* However even in that case we sometimes do expensive things like loading global ordinals. This method should prevent this.
131-
* Note that if {@link org.elasticsearch.search.SearchService#executeQueryPhase(ShardSearchRequest, SearchShardTask, ActionListener)}
131+
* Note that if {@link org.elasticsearch.search.SearchService#executeQueryPhase(ShardSearchRequest, CancellableTask, ActionListener)}
132132
* always do a can match then we don't need this code here.
133133
*/
134134
static boolean matchNoDocs(AggregationContext context, Aggregator parent) {

server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.lucene.search.FieldDoc;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.TotalHits;
15-
import org.elasticsearch.action.search.SearchShardTask;
1615
import org.elasticsearch.action.search.SearchType;
1716
import org.elasticsearch.core.TimeValue;
1817
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@@ -40,6 +39,7 @@
4039
import org.elasticsearch.search.rescore.RescoreContext;
4140
import org.elasticsearch.search.sort.SortAndFormats;
4241
import org.elasticsearch.search.suggest.SuggestionSearchContext;
42+
import org.elasticsearch.tasks.CancellableTask;
4343

4444
import java.util.List;
4545

@@ -422,12 +422,12 @@ public SearchExecutionContext getSearchExecutionContext() {
422422
}
423423

424424
@Override
425-
public void setTask(SearchShardTask task) {
425+
public void setTask(CancellableTask task) {
426426
in.setTask(task);
427427
}
428428

429429
@Override
430-
public SearchShardTask getTask() {
430+
public CancellableTask getTask() {
431431
return in.getTask();
432432
}
433433

server/src/main/java/org/elasticsearch/search/internal/SearchContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.search.FieldDoc;
1212
import org.apache.lucene.search.Query;
1313
import org.apache.lucene.search.TotalHits;
14-
import org.elasticsearch.action.search.SearchShardTask;
1514
import org.elasticsearch.action.search.SearchType;
1615
import org.elasticsearch.core.Assertions;
1716
import org.elasticsearch.core.Nullable;
@@ -48,6 +47,7 @@
4847
import org.elasticsearch.search.rescore.RescoreContext;
4948
import org.elasticsearch.search.sort.SortAndFormats;
5049
import org.elasticsearch.search.suggest.SuggestionSearchContext;
50+
import org.elasticsearch.tasks.CancellableTask;
5151
import org.elasticsearch.transport.LeakTracker;
5252

5353
import java.io.IOException;
@@ -90,7 +90,7 @@ public final List<Runnable> getCancellationChecks() {
9090
if (lowLevelCancellation()) {
9191
// This searching doesn't live beyond this phase, so we don't need to remove query cancellation
9292
Runnable c = () -> {
93-
final SearchShardTask task = getTask();
93+
final CancellableTask task = getTask();
9494
if (task != null) {
9595
task.ensureNotCancelled();
9696
}
@@ -100,9 +100,9 @@ public final List<Runnable> getCancellationChecks() {
100100
return timeoutRunnable == null ? List.of() : List.of(timeoutRunnable);
101101
}
102102

103-
public abstract void setTask(SearchShardTask task);
103+
public abstract void setTask(CancellableTask task);
104104

105-
public abstract SearchShardTask getTask();
105+
public abstract CancellableTask getTask();
106106

107107
public abstract boolean isCancelled();
108108

server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.lucene.search.FieldDoc;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.TotalHits;
15-
import org.elasticsearch.action.search.SearchShardTask;
1615
import org.elasticsearch.action.search.SearchType;
1716
import org.elasticsearch.core.TimeValue;
1817
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@@ -48,6 +47,7 @@
4847
import org.elasticsearch.search.rescore.RescoreContext;
4948
import org.elasticsearch.search.sort.SortAndFormats;
5049
import org.elasticsearch.search.suggest.SuggestionSearchContext;
50+
import org.elasticsearch.tasks.CancellableTask;
5151

5252
import java.util.List;
5353

@@ -211,12 +211,12 @@ public long getRelativeTimeInMillis() {
211211
/* ---- ALL METHODS ARE UNSUPPORTED BEYOND HERE ---- */
212212

213213
@Override
214-
public void setTask(SearchShardTask task) {
214+
public void setTask(CancellableTask task) {
215215
throw new UnsupportedOperationException();
216216
}
217217

218218
@Override
219-
public SearchShardTask getTask() {
219+
public CancellableTask getTask() {
220220
throw new UnsupportedOperationException();
221221
}
222222

server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.search.builder.SearchSourceBuilder;
2727
import org.elasticsearch.search.internal.SearchContext;
2828
import org.elasticsearch.search.internal.ShardSearchRequest;
29+
import org.elasticsearch.tasks.CancellableTask;
2930
import org.elasticsearch.tasks.Task;
3031
import org.elasticsearch.test.ESSingleNodeTestCase;
3132
import org.elasticsearch.test.TestSearchContext;
@@ -93,7 +94,7 @@ public ShardSearchRequest request() {
9394
}
9495

9596
@Override
96-
public SearchShardTask getTask() {
97+
public CancellableTask getTask() {
9798
return super.getTask();
9899
}
99100
};

test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.search;
1111

12-
import org.elasticsearch.action.search.SearchShardTask;
1312
import org.elasticsearch.cluster.service.ClusterService;
1413
import org.elasticsearch.common.util.BigArrays;
1514
import org.elasticsearch.core.TimeValue;
@@ -23,6 +22,7 @@
2322
import org.elasticsearch.search.internal.ReaderContext;
2423
import org.elasticsearch.search.internal.SearchContext;
2524
import org.elasticsearch.search.internal.ShardSearchRequest;
25+
import org.elasticsearch.tasks.CancellableTask;
2626
import org.elasticsearch.telemetry.tracing.Tracer;
2727
import org.elasticsearch.threadpool.ThreadPool;
2828

@@ -46,7 +46,7 @@ public static class TestPlugin extends Plugin {}
4646

4747
private Consumer<SearchContext> onCreateSearchContext = context -> {};
4848

49-
private Function<SearchShardTask, SearchShardTask> onCheckCancelled = Function.identity();
49+
private Function<CancellableTask, CancellableTask> onCheckCancelled = Function.identity();
5050

5151
/** Throw an {@link AssertionError} if there are still in-flight contexts. */
5252
public static void assertNoInFlightContext() {
@@ -132,7 +132,7 @@ public void setOnCreateSearchContext(Consumer<SearchContext> onCreateSearchConte
132132
protected SearchContext createContext(
133133
ReaderContext readerContext,
134134
ShardSearchRequest request,
135-
SearchShardTask task,
135+
CancellableTask task,
136136
ResultsType resultsType,
137137
boolean includeAggregations
138138
) throws IOException {
@@ -154,12 +154,12 @@ public SearchContext createSearchContext(ShardSearchRequest request, TimeValue t
154154
return searchContext;
155155
}
156156

157-
public void setOnCheckCancelled(Function<SearchShardTask, SearchShardTask> onCheckCancelled) {
157+
public void setOnCheckCancelled(Function<CancellableTask, CancellableTask> onCheckCancelled) {
158158
this.onCheckCancelled = onCheckCancelled;
159159
}
160160

161161
@Override
162-
protected void checkCancelled(SearchShardTask task) {
162+
protected void checkCancelled(CancellableTask task) {
163163
super.checkCancelled(onCheckCancelled.apply(task));
164164
}
165165
}

test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.search.FieldDoc;
1212
import org.apache.lucene.search.Query;
1313
import org.apache.lucene.search.TotalHits;
14-
import org.elasticsearch.action.search.SearchShardTask;
1514
import org.elasticsearch.action.search.SearchType;
1615
import org.elasticsearch.core.TimeValue;
1716
import org.elasticsearch.index.IndexService;
@@ -49,6 +48,7 @@
4948
import org.elasticsearch.search.rescore.RescoreContext;
5049
import org.elasticsearch.search.sort.SortAndFormats;
5150
import org.elasticsearch.search.suggest.SuggestionSearchContext;
51+
import org.elasticsearch.tasks.CancellableTask;
5252

5353
import java.util.Collections;
5454
import java.util.HashMap;
@@ -67,7 +67,7 @@ public class TestSearchContext extends SearchContext {
6767
ParsedQuery postFilter;
6868
Query query;
6969
Float minScore;
70-
SearchShardTask task;
70+
CancellableTask task;
7171
SortAndFormats sort;
7272
boolean trackScores = false;
7373
int trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO;
@@ -506,12 +506,12 @@ public SearchExecutionContext getSearchExecutionContext() {
506506
}
507507

508508
@Override
509-
public void setTask(SearchShardTask task) {
509+
public void setTask(CancellableTask task) {
510510
this.task = task;
511511
}
512512

513513
@Override
514-
public SearchShardTask getTask() {
514+
public CancellableTask getTask() {
515515
return task;
516516
}
517517

0 commit comments

Comments
 (0)