Skip to content

Commit a222e16

Browse files
Delay creation of the next SearchPhase in executeNextPhase (#116061)
Delaying the creation of the next phase to only when we actually need it makes this a lot easier to reason about and should set up further simplications. Eager creation of the next phase forced a lot of needlessly complicated safety logic around resources on us. Since we never "close" the `nextPhase` on failure all its resources need to be tracked in via `context.addReleasable`. This isn't as much of an issue with some recent refactorings leaving very little resource creation in the constructors but still, delaying things saves memory and makes reasoning about failure cases far easier.
1 parent 281416d commit a222e16

File tree

7 files changed

+17
-10
lines changed

7 files changed

+17
-10
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.concurrent.atomic.AtomicInteger;
5656
import java.util.function.BiFunction;
5757
import java.util.function.Consumer;
58+
import java.util.function.Supplier;
5859
import java.util.stream.Collectors;
5960

6061
import static org.elasticsearch.core.Strings.format;
@@ -343,7 +344,7 @@ protected abstract void executePhaseOnShard(
343344
);
344345

345346
@Override
346-
public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
347+
public final void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier) {
347348
/* This is the main search phase transition where we move to the next phase. If all shards
348349
* failed or if there was a failure and partial results are not allowed, then we immediately
349350
* fail. Otherwise we continue to the next phase.
@@ -387,6 +388,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
387388
}
388389
return;
389390
}
391+
var nextPhase = nextPhaseSupplier.get();
390392
if (logger.isTraceEnabled()) {
391393
final String resultsFrom = results.getSuccessfulResults()
392394
.map(r -> r.getSearchShardTarget().toString())
@@ -697,7 +699,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
697699
* @see #onShardResult(SearchPhaseResult, SearchShardIterator)
698700
*/
699701
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
700-
executeNextPhase(this, getNextPhase(results, this));
702+
executeNextPhase(this, () -> getNextPhase(results, this));
701703
}
702704

703705
@Override

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void run() {
7878
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(
7979
queryResult,
8080
searchResults.size(),
81-
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)),
81+
() -> context.executeNextPhase(this, () -> nextPhaseFactory.apply(queryResult)),
8282
context
8383
);
8484

server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,6 @@ private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilde
164164
}
165165

166166
private void onPhaseDone() {
167-
context.executeNextPhase(this, nextPhase.get());
167+
context.executeNextPhase(this, nextPhase);
168168
}
169169
}

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,11 @@ private void moveToNextPhase(
269269
AtomicArray<? extends SearchPhaseResult> fetchResultsArr,
270270
SearchPhaseController.ReducedQueryPhase reducedQueryPhase
271271
) {
272-
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
273-
context.addReleasable(resp::decRef);
274-
context.executeNextPhase(this, nextPhaseFactory.apply(resp, searchPhaseShardResults));
272+
context.executeNextPhase(this, () -> {
273+
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
274+
context.addReleasable(resp::decRef);
275+
return nextPhaseFactory.apply(resp, searchPhaseShardResults);
276+
});
275277
}
276278

277279
private boolean shouldExplainRankScores(SearchRequest request) {

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,6 @@ private float maxScore(ScoreDoc[] scoreDocs) {
236236
}
237237

238238
void moveToNextPhase(SearchPhaseResults<SearchPhaseResult> phaseResults, SearchPhaseController.ReducedQueryPhase reducedQueryPhase) {
239-
context.executeNextPhase(this, new FetchSearchPhase(phaseResults, aggregatedDfs, context, reducedQueryPhase));
239+
context.executeNextPhase(this, () -> new FetchSearchPhase(phaseResults, aggregatedDfs, context, reducedQueryPhase));
240240
}
241241
}

server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.transport.Transport;
2020

2121
import java.util.concurrent.Executor;
22+
import java.util.function.Supplier;
2223

2324
/**
2425
* This class provide contextual state and access to resources across multiple search phases.
@@ -120,7 +121,7 @@ default void sendReleaseSearchContext(
120121
* of the next phase. If there are no successful operations in the context when this method is executed the search is aborted and
121122
* a response is returned to the user indicating that all shards have failed.
122123
*/
123-
void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase);
124+
void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier);
124125

125126
/**
126127
* Registers a {@link Releasable} that will be closed when the search request finishes or fails.

server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.function.Supplier;
3334

3435
/**
3536
* SearchPhaseContext for tests
@@ -132,7 +133,8 @@ public SearchTransportService getSearchTransport() {
132133
}
133134

134135
@Override
135-
public void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
136+
public void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier) {
137+
var nextPhase = nextPhaseSupplier.get();
136138
try {
137139
nextPhase.run();
138140
} catch (Exception e) {

0 commit comments

Comments
 (0)