|
33 | 33 | * Then it reaches out to all relevant shards to fetch the topN hits. |
34 | 34 | */ |
35 | 35 | final class FetchSearchPhase extends SearchPhase { |
36 | | - private final ArraySearchPhaseResults<FetchSearchResult> fetchResults; |
37 | 36 | private final AtomicArray<SearchPhaseResult> searchPhaseShardResults; |
38 | 37 | private final BiFunction<SearchResponseSections, AtomicArray<SearchPhaseResult>, SearchPhase> nextPhaseFactory; |
39 | 38 | private final SearchPhaseContext context; |
@@ -79,8 +78,6 @@ final class FetchSearchPhase extends SearchPhase { |
79 | 78 | + resultConsumer.getNumShards() |
80 | 79 | ); |
81 | 80 | } |
82 | | - this.fetchResults = new ArraySearchPhaseResults<>(resultConsumer.getNumShards()); |
83 | | - context.addReleasable(fetchResults); |
84 | 81 | this.searchPhaseShardResults = resultConsumer.getAtomicArray(); |
85 | 82 | this.aggregatedDfs = aggregatedDfs; |
86 | 83 | this.nextPhaseFactory = nextPhaseFactory; |
@@ -129,48 +126,56 @@ private void innerRun() throws Exception { |
129 | 126 | // we have to release contexts here to free up resources |
130 | 127 | searchPhaseShardResults.asList() |
131 | 128 | .forEach(searchPhaseShardResult -> releaseIrrelevantSearchContext(searchPhaseShardResult, context)); |
132 | | - moveToNextPhase(fetchResults.getAtomicArray(), reducedQueryPhase); |
| 129 | + moveToNextPhase(new AtomicArray<>(numShards), reducedQueryPhase); |
133 | 130 | } else { |
134 | | - final boolean shouldExplainRank = shouldExplainRankScores(context.getRequest()); |
135 | | - final List<Map<Integer, RankDoc>> rankDocsPerShard = false == shouldExplainRank |
136 | | - ? null |
137 | | - : splitRankDocsPerShard(scoreDocs, numShards); |
138 | | - final ScoreDoc[] lastEmittedDocPerShard = context.getRequest().scroll() != null |
139 | | - ? SearchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards) |
140 | | - : null; |
141 | | - final List<Integer>[] docIdsToLoad = SearchPhaseController.fillDocIdsToLoad(numShards, scoreDocs); |
142 | | - final CountedCollector<FetchSearchResult> counter = new CountedCollector<>( |
143 | | - fetchResults, |
144 | | - docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not |
145 | | - () -> moveToNextPhase(fetchResults.getAtomicArray(), reducedQueryPhase), |
146 | | - context |
147 | | - ); |
148 | | - for (int i = 0; i < docIdsToLoad.length; i++) { |
149 | | - List<Integer> entry = docIdsToLoad[i]; |
150 | | - RankDocShardInfo rankDocs = rankDocsPerShard == null || rankDocsPerShard.get(i).isEmpty() |
151 | | - ? null |
152 | | - : new RankDocShardInfo(rankDocsPerShard.get(i)); |
153 | | - SearchPhaseResult shardPhaseResult = searchPhaseShardResults.get(i); |
154 | | - if (entry == null) { // no results for this shard ID |
155 | | - if (shardPhaseResult != null) { |
156 | | - // if we got some hits from this shard we have to release the context there |
157 | | - // we do this as we go since it will free up resources and passing on the request on the |
158 | | - // transport layer is cheap. |
159 | | - releaseIrrelevantSearchContext(shardPhaseResult, context); |
160 | | - progressListener.notifyFetchResult(i); |
161 | | - } |
162 | | - // in any case we count down this result since we don't talk to this shard anymore |
163 | | - counter.countDown(); |
164 | | - } else { |
165 | | - executeFetch( |
166 | | - shardPhaseResult, |
167 | | - counter, |
168 | | - entry, |
169 | | - rankDocs, |
170 | | - (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[i] : null |
171 | | - ); |
172 | | - } |
| 131 | + innerRunFetch(scoreDocs, numShards, reducedQueryPhase); |
| 132 | + } |
| 133 | + } |
| 134 | + } |
| 135 | + |
| 136 | + private void innerRunFetch(ScoreDoc[] scoreDocs, int numShards, SearchPhaseController.ReducedQueryPhase reducedQueryPhase) { |
| 137 | + ArraySearchPhaseResults<FetchSearchResult> fetchResults = new ArraySearchPhaseResults<>(numShards); |
| 138 | + final List<Map<Integer, RankDoc>> rankDocsPerShard = false == shouldExplainRankScores(context.getRequest()) |
| 139 | + ? null |
| 140 | + : splitRankDocsPerShard(scoreDocs, numShards); |
| 141 | + final ScoreDoc[] lastEmittedDocPerShard = context.getRequest().scroll() != null |
| 142 | + ? SearchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards) |
| 143 | + : null; |
| 144 | + final List<Integer>[] docIdsToLoad = SearchPhaseController.fillDocIdsToLoad(numShards, scoreDocs); |
| 145 | + final CountedCollector<FetchSearchResult> counter = new CountedCollector<>( |
| 146 | + fetchResults, |
| 147 | + docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not |
| 148 | + () -> { |
| 149 | + try (fetchResults) { |
| 150 | + moveToNextPhase(fetchResults.getAtomicArray(), reducedQueryPhase); |
| 151 | + } |
| 152 | + }, |
| 153 | + context |
| 154 | + ); |
| 155 | + for (int i = 0; i < docIdsToLoad.length; i++) { |
| 156 | + List<Integer> entry = docIdsToLoad[i]; |
| 157 | + RankDocShardInfo rankDocs = rankDocsPerShard == null || rankDocsPerShard.get(i).isEmpty() |
| 158 | + ? null |
| 159 | + : new RankDocShardInfo(rankDocsPerShard.get(i)); |
| 160 | + SearchPhaseResult shardPhaseResult = searchPhaseShardResults.get(i); |
| 161 | + if (entry == null) { // no results for this shard ID |
| 162 | + if (shardPhaseResult != null) { |
| 163 | + // if we got some hits from this shard we have to release the context there |
| 164 | + // we do this as we go since it will free up resources and passing on the request on the |
| 165 | + // transport layer is cheap. |
| 166 | + releaseIrrelevantSearchContext(shardPhaseResult, context); |
| 167 | + progressListener.notifyFetchResult(i); |
173 | 168 | } |
| 169 | + // in any case we count down this result since we don't talk to this shard anymore |
| 170 | + counter.countDown(); |
| 171 | + } else { |
| 172 | + executeFetch( |
| 173 | + shardPhaseResult, |
| 174 | + counter, |
| 175 | + entry, |
| 176 | + rankDocs, |
| 177 | + (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[i] : null |
| 178 | + ); |
174 | 179 | } |
175 | 180 | } |
176 | 181 | } |
@@ -257,7 +262,6 @@ private void moveToNextPhase( |
257 | 262 | ) { |
258 | 263 | var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr); |
259 | 264 | context.addReleasable(resp::decRef); |
260 | | - fetchResults.close(); |
261 | 265 | context.executeNextPhase(this, nextPhaseFactory.apply(resp, searchPhaseShardResults)); |
262 | 266 | } |
263 | 267 |
|
|
0 commit comments