Skip to content

Commit 0415c4c

Browse files
Remove SearchPhaseContext (#116471) (#117396)
* Stop instantiating RankFeaturePhase unnecessarily (#115724) We should not create the phase instance when we know we won't be doing any rank feature execution up-front. An instance of these isn't free and entails creating an array of searched_shard_count size which along is non-trivial. Also, this needlessly obscured the threading logic for fetch which has already led to a bug before. * Remove SearchPhaseContext (#116471) The only production implementation of this thing is AbstractSearchAsyncAction, no need to keep a separate interface around. This makes the logic a lot more obvious in terms of the lifeycle of "context" and how it's essentially just the "main" search phase. Plus it outright saves a lot of code, even though it adds a little on the test side. * Cleanup some redundancies around DfsQueryPhase (#116057) No need to add the result consumer to teh context another time, it's already added to it in the constructor of `SearchDfsQueryThenFetchAsyncAction`. Also, no need to feed `this` and `this.results` to `getNextPhase` explicitly, there's only a single call to this method so we can safely clean up the redundant arguments.
1 parent 04849b0 commit 0415c4c

22 files changed

+216
-382
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
7070
* distributed frequencies
7171
*/
72-
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
72+
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
7373
private static final float DEFAULT_INDEX_BOOST = 1.0f;
7474
private final Logger logger;
7575
private final NamedWriteableRegistry namedWriteableRegistry;
@@ -107,7 +107,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
107107
private final boolean throttleConcurrentRequests;
108108
private final AtomicBoolean requestCancelled = new AtomicBoolean();
109109

110-
private final List<Releasable> releasables = new ArrayList<>();
110+
// protected for tests
111+
protected final List<Releasable> releasables = new ArrayList<>();
111112

112113
AbstractSearchAsyncAction(
113114
String name,
@@ -195,7 +196,9 @@ protected void notifyListShards(
195196
);
196197
}
197198

198-
@Override
199+
/**
200+
* Registers a {@link Releasable} that will be closed when the search request finishes or fails.
201+
*/
199202
public void addReleasable(Releasable releasable) {
200203
releasables.add(releasable);
201204
}
@@ -368,8 +371,12 @@ protected abstract void executePhaseOnShard(
368371
SearchActionListener<Result> listener
369372
);
370373

371-
@Override
372-
public final void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier) {
374+
/**
375+
* Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution
376+
* of the next phase. If there are no successful operations in the context when this method is executed the search is aborted and
377+
* a response is returned to the user indicating that all shards have failed.
378+
*/
379+
protected void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier) {
373380
/* This is the main search phase transition where we move to the next phase. If all shards
374381
* failed or if there was a failure and partial results are not allowed, then we immediately
375382
* fail. Otherwise we continue to the next phase.
@@ -505,8 +512,7 @@ protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget
505512
* @param shardTarget the shard target for this failure
506513
* @param e the failure reason
507514
*/
508-
@Override
509-
public final void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Exception e) {
515+
void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Exception e) {
510516
if (TransportActions.isShardNotAvailableException(e)) {
511517
// Groups shard not available exceptions under a generic exception that returns a SERVICE_UNAVAILABLE(503)
512518
// temporary error.
@@ -603,32 +609,45 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
603609
}
604610
}
605611

606-
@Override
612+
/**
613+
* Returns the total number of shards to the current search across all indices
614+
*/
607615
public final int getNumShards() {
608616
return results.getNumShards();
609617
}
610618

611-
@Override
619+
/**
620+
* Returns a logger for this context to prevent each individual phase to create their own logger.
621+
*/
612622
public final Logger getLogger() {
613623
return logger;
614624
}
615625

616-
@Override
626+
/**
627+
* Returns the currently executing search task
628+
*/
617629
public final SearchTask getTask() {
618630
return task;
619631
}
620632

621-
@Override
633+
/**
634+
* Returns the currently executing search request
635+
*/
622636
public final SearchRequest getRequest() {
623637
return request;
624638
}
625639

626-
@Override
640+
/**
641+
* Returns the targeted {@link OriginalIndices} for the provided {@code shardIndex}.
642+
*/
627643
public OriginalIndices getOriginalIndices(int shardIndex) {
628644
return shardIterators[shardIndex].getOriginalIndices();
629645
}
630646

631-
@Override
647+
/**
648+
* Checks if the given context id is part of the point in time of this search (if exists).
649+
* We should not release search contexts that belong to the point in time during or after searches.
650+
*/
632651
public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
633652
final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
634653
if (pointInTimeBuilder != null) {
@@ -665,7 +684,12 @@ boolean buildPointInTimeFromSearchResults() {
665684
return false;
666685
}
667686

668-
@Override
687+
/**
688+
* Builds and sends the final search response back to the user.
689+
*
690+
* @param internalSearchResponse the internal search response
691+
* @param queryResults the results of the query phase
692+
*/
669693
public void sendSearchResponse(SearchResponseSections internalSearchResponse, AtomicArray<SearchPhaseResult> queryResults) {
670694
ShardSearchFailure[] failures = buildShardFailures();
671695
Boolean allowPartialResults = request.allowPartialSearchResults();
@@ -690,8 +714,14 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
690714
}
691715
}
692716

693-
@Override
694-
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
717+
/**
718+
* This method will communicate a fatal phase failure back to the user. In contrast to a shard failure
719+
* will this method immediately fail the search request and return the failure to the issuer of the request
720+
* @param phase the phase that failed
721+
* @param msg an optional message
722+
* @param cause the cause of the phase failure
723+
*/
724+
public void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
695725
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
696726
}
697727

@@ -718,16 +748,32 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
718748
listener.onFailure(exception);
719749
}
720750

751+
/**
752+
* Releases a search context with the given context ID on the node the given connection is connected to.
753+
* @see org.elasticsearch.search.query.QuerySearchResult#getContextId()
754+
* @see org.elasticsearch.search.fetch.FetchSearchResult#getContextId()
755+
*
756+
*/
757+
void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
758+
assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
759+
if (connection != null) {
760+
searchTransportService.sendFreeContext(connection, contextId, originalIndices);
761+
}
762+
}
763+
721764
/**
722765
* Executed once all shard results have been received and processed
723766
* @see #onShardFailure(int, SearchShardTarget, Exception)
724767
* @see #onShardResult(SearchPhaseResult, SearchShardIterator)
725768
*/
726769
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
727-
executeNextPhase(this, () -> getNextPhase(results, this));
770+
executeNextPhase(this, this::getNextPhase);
728771
}
729772

730-
@Override
773+
/**
774+
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
775+
* thrown.
776+
*/
731777
public final Transport.Connection getConnection(String clusterAlias, String nodeId) {
732778
Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId);
733779
Version minVersion = request.minCompatibleShardNode();
@@ -737,18 +783,21 @@ public final Transport.Connection getConnection(String clusterAlias, String node
737783
return conn;
738784
}
739785

740-
@Override
741-
public final SearchTransportService getSearchTransport() {
786+
/**
787+
* Returns the {@link SearchTransportService} to send shard request to other nodes
788+
*/
789+
public SearchTransportService getSearchTransport() {
742790
return searchTransportService;
743791
}
744792

745-
@Override
746793
public final void execute(Runnable command) {
747794
executor.execute(command);
748795
}
749796

750-
@Override
751-
public final void onFailure(Exception e) {
797+
/**
798+
* Notifies the top-level listener of the provided exception
799+
*/
800+
public void onFailure(Exception e) {
752801
listener.onFailure(e);
753802
}
754803

@@ -786,11 +835,8 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
786835

787836
/**
788837
* Returns the next phase based on the results of the initial search phase
789-
* @param results the results of the initial search phase. Each non null element in the result array represent a successfully
790-
* executed shard request
791-
* @param context the search context for the next phase
792838
*/
793-
protected abstract SearchPhase getNextPhase(SearchPhaseResults<Result> results, SearchPhaseContext context);
839+
protected abstract SearchPhase getNextPhase();
794840

795841
private static final class PendingExecutions {
796842
private final Semaphore semaphore;

server/src/main/java/org/elasticsearch/action/search/CountedCollector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ final class CountedCollector<R extends SearchPhaseResult> {
2222
private final SearchPhaseResults<R> resultConsumer;
2323
private final CountDown counter;
2424
private final Runnable onFinish;
25-
private final SearchPhaseContext context;
25+
private final AbstractSearchAsyncAction<?> context;
2626

27-
CountedCollector(SearchPhaseResults<R> resultConsumer, int expectedOps, Runnable onFinish, SearchPhaseContext context) {
27+
CountedCollector(SearchPhaseResults<R> resultConsumer, int expectedOps, Runnable onFinish, AbstractSearchAsyncAction<?> context) {
2828
this.resultConsumer = resultConsumer;
2929
this.counter = new CountDown(expectedOps);
3030
this.onFinish = onFinish;
@@ -50,7 +50,7 @@ void onResult(R result) {
5050
}
5151

5252
/**
53-
* Escalates the failure via {@link SearchPhaseContext#onShardFailure(int, SearchShardTarget, Exception)}
53+
* Escalates the failure via {@link AbstractSearchAsyncAction#onShardFailure(int, SearchShardTarget, Exception)}
5454
* and then runs {@link #countDown()}
5555
*/
5656
void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ final class DfsQueryPhase extends SearchPhase {
4444
private final AggregatedDfs dfs;
4545
private final List<DfsKnnResults> knnResults;
4646
private final Function<SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
47-
private final SearchPhaseContext context;
47+
private final AbstractSearchAsyncAction<?> context;
4848
private final SearchTransportService searchTransportService;
4949
private final SearchProgressListener progressListener;
5050

@@ -54,7 +54,7 @@ final class DfsQueryPhase extends SearchPhase {
5454
List<DfsKnnResults> knnResults,
5555
SearchPhaseResults<SearchPhaseResult> queryResult,
5656
Function<SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
57-
SearchPhaseContext context
57+
AbstractSearchAsyncAction<?> context
5858
) {
5959
super("dfs_query");
6060
this.progressListener = context.getTask().getProgressListener();
@@ -65,10 +65,6 @@ final class DfsQueryPhase extends SearchPhase {
6565
this.nextPhaseFactory = nextPhaseFactory;
6666
this.context = context;
6767
this.searchTransportService = context.getSearchTransport();
68-
69-
// register the release of the query consumer to free up the circuit breaker memory
70-
// at the end of the search
71-
context.addReleasable(queryResult);
7268
}
7369

7470
@Override

server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
* forwards to the next phase immediately.
3232
*/
3333
final class ExpandSearchPhase extends SearchPhase {
34-
private final SearchPhaseContext context;
34+
private final AbstractSearchAsyncAction<?> context;
3535
private final SearchHits searchHits;
3636
private final Supplier<SearchPhase> nextPhase;
3737

38-
ExpandSearchPhase(SearchPhaseContext context, SearchHits searchHits, Supplier<SearchPhase> nextPhase) {
38+
ExpandSearchPhase(AbstractSearchAsyncAction<?> context, SearchHits searchHits, Supplier<SearchPhase> nextPhase) {
3939
super("expand");
4040
this.context = context;
4141
this.searchHits = searchHits;

server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@
3333
* @see org.elasticsearch.index.mapper.LookupRuntimeFieldType
3434
*/
3535
final class FetchLookupFieldsPhase extends SearchPhase {
36-
private final SearchPhaseContext context;
36+
private final AbstractSearchAsyncAction<?> context;
3737
private final SearchResponseSections searchResponse;
3838
private final AtomicArray<SearchPhaseResult> queryResults;
3939

40-
FetchLookupFieldsPhase(SearchPhaseContext context, SearchResponseSections searchResponse, AtomicArray<SearchPhaseResult> queryResults) {
40+
FetchLookupFieldsPhase(
41+
AbstractSearchAsyncAction<?> context,
42+
SearchResponseSections searchResponse,
43+
AtomicArray<SearchPhaseResult> queryResults
44+
) {
4145
super("fetch_lookup_fields");
4246
this.context = context;
4347
this.searchResponse = searchResponse;

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
final class FetchSearchPhase extends SearchPhase {
3737
private final AtomicArray<SearchPhaseResult> searchPhaseShardResults;
3838
private final BiFunction<SearchResponseSections, AtomicArray<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
39-
private final SearchPhaseContext context;
39+
private final AbstractSearchAsyncAction<?> context;
4040
private final Logger logger;
4141
private final SearchProgressListener progressListener;
4242
private final AggregatedDfs aggregatedDfs;
@@ -47,7 +47,7 @@ final class FetchSearchPhase extends SearchPhase {
4747
FetchSearchPhase(
4848
SearchPhaseResults<SearchPhaseResult> resultConsumer,
4949
AggregatedDfs aggregatedDfs,
50-
SearchPhaseContext context,
50+
AbstractSearchAsyncAction<?> context,
5151
@Nullable SearchPhaseController.ReducedQueryPhase reducedQueryPhase
5252
) {
5353
this(
@@ -66,7 +66,7 @@ final class FetchSearchPhase extends SearchPhase {
6666
FetchSearchPhase(
6767
SearchPhaseResults<SearchPhaseResult> resultConsumer,
6868
AggregatedDfs aggregatedDfs,
69-
SearchPhaseContext context,
69+
AbstractSearchAsyncAction<?> context,
7070
@Nullable SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
7171
BiFunction<SearchResponseSections, AtomicArray<SearchPhaseResult>, SearchPhase> nextPhaseFactory
7272
) {

0 commit comments

Comments
 (0)