Skip to content

Commit 7062a48

Browse files
Delay creation of the next SearchPhase in executeNextPhase (#116061) (#117369)
Delaying the creation of the next phase to only when we actually need it makes this a lot easier to reason about and should set up further simplications. Eager creation of the next phase forced a lot of needlessly complicated safety logic around resources on us. Since we never "close" the `nextPhase` on failure all its resources need to be tracked in via `context.addReleasable`. This isn't as much of an issue with some recent refactorings leaving very little resource creation in the constructors but still, delaying things saves memory and makes reasoning about failure cases far easier.
1 parent 203615f commit 7062a48

File tree

7 files changed

+17
-10
lines changed

7 files changed

+17
-10
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.concurrent.atomic.AtomicInteger;
5757
import java.util.function.BiFunction;
5858
import java.util.function.Consumer;
59+
import java.util.function.Supplier;
5960
import java.util.stream.Collectors;
6061

6162
import static org.elasticsearch.core.Strings.format;
@@ -368,7 +369,7 @@ protected abstract void executePhaseOnShard(
368369
);
369370

370371
@Override
371-
public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
372+
public final void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier) {
372373
/* This is the main search phase transition where we move to the next phase. If all shards
373374
* failed or if there was a failure and partial results are not allowed, then we immediately
374375
* fail. Otherwise we continue to the next phase.
@@ -412,6 +413,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
412413
}
413414
return;
414415
}
416+
var nextPhase = nextPhaseSupplier.get();
415417
if (logger.isTraceEnabled()) {
416418
final String resultsFrom = results.getSuccessfulResults()
417419
.map(r -> r.getSearchShardTarget().toString())
@@ -722,7 +724,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
722724
* @see #onShardResult(SearchPhaseResult, SearchShardIterator)
723725
*/
724726
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
725-
executeNextPhase(this, getNextPhase(results, this));
727+
executeNextPhase(this, () -> getNextPhase(results, this));
726728
}
727729

728730
@Override

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void run() {
7878
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(
7979
queryResult,
8080
searchResults.size(),
81-
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)),
81+
() -> context.executeNextPhase(this, () -> nextPhaseFactory.apply(queryResult)),
8282
context
8383
);
8484

server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,6 @@ private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilde
164164
}
165165

166166
private void onPhaseDone() {
167-
context.executeNextPhase(this, nextPhase.get());
167+
context.executeNextPhase(this, nextPhase);
168168
}
169169
}

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,11 @@ private void moveToNextPhase(
260260
AtomicArray<? extends SearchPhaseResult> fetchResultsArr,
261261
SearchPhaseController.ReducedQueryPhase reducedQueryPhase
262262
) {
263-
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
264-
context.addReleasable(resp::decRef);
265-
context.executeNextPhase(this, nextPhaseFactory.apply(resp, searchPhaseShardResults));
263+
context.executeNextPhase(this, () -> {
264+
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
265+
context.addReleasable(resp::decRef);
266+
return nextPhaseFactory.apply(resp, searchPhaseShardResults);
267+
});
266268
}
267269

268270
private boolean shouldExplainRankScores(SearchRequest request) {

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,6 @@ private float maxScore(ScoreDoc[] scoreDocs) {
233233
}
234234

235235
void moveToNextPhase(SearchPhaseResults<SearchPhaseResult> phaseResults, SearchPhaseController.ReducedQueryPhase reducedQueryPhase) {
236-
context.executeNextPhase(this, new FetchSearchPhase(phaseResults, aggregatedDfs, context, reducedQueryPhase));
236+
context.executeNextPhase(this, () -> new FetchSearchPhase(phaseResults, aggregatedDfs, context, reducedQueryPhase));
237237
}
238238
}

server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.transport.Transport;
2020

2121
import java.util.concurrent.Executor;
22+
import java.util.function.Supplier;
2223

2324
/**
2425
* This class provide contextual state and access to resources across multiple search phases.
@@ -120,7 +121,7 @@ default void sendReleaseSearchContext(
120121
* of the next phase. If there are no successful operations in the context when this method is executed the search is aborted and
121122
* a response is returned to the user indicating that all shards have failed.
122123
*/
123-
void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase);
124+
void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier);
124125

125126
/**
126127
* Registers a {@link Releasable} that will be closed when the search request finishes or fails.

server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.function.Supplier;
3334

3435
/**
3536
* SearchPhaseContext for tests
@@ -132,7 +133,8 @@ public SearchTransportService getSearchTransport() {
132133
}
133134

134135
@Override
135-
public void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
136+
public void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nextPhaseSupplier) {
137+
var nextPhase = nextPhaseSupplier.get();
136138
try {
137139
nextPhase.run();
138140
} catch (Exception e) {

0 commit comments

Comments
 (0)