|
23 | 23 | import org.elasticsearch.action.support.IndicesOptions; |
24 | 24 | import org.elasticsearch.client.internal.Client; |
25 | 25 | import org.elasticsearch.cluster.ClusterState; |
| 26 | +import org.elasticsearch.common.bytes.ReleasableBytesReference; |
26 | 27 | import org.elasticsearch.common.io.stream.NamedWriteableRegistry; |
| 28 | +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; |
27 | 29 | import org.elasticsearch.common.io.stream.StreamInput; |
28 | 30 | import org.elasticsearch.common.io.stream.StreamOutput; |
29 | 31 | import org.elasticsearch.common.io.stream.Writeable; |
|
50 | 52 | import org.elasticsearch.tasks.TaskId; |
51 | 53 | import org.elasticsearch.threadpool.ThreadPool; |
52 | 54 | import org.elasticsearch.transport.AbstractTransportRequest; |
| 55 | +import org.elasticsearch.transport.BytesTransportResponse; |
53 | 56 | import org.elasticsearch.transport.LeakTracker; |
54 | 57 | import org.elasticsearch.transport.SendRequestTransportException; |
55 | 58 | import org.elasticsearch.transport.Transport; |
|
58 | 61 | import org.elasticsearch.transport.TransportException; |
59 | 62 | import org.elasticsearch.transport.TransportResponse; |
60 | 63 | import org.elasticsearch.transport.TransportResponseHandler; |
| 64 | +import org.elasticsearch.transport.TransportService; |
61 | 65 |
|
62 | 66 | import java.io.IOException; |
63 | 67 | import java.util.ArrayList; |
@@ -215,41 +219,22 @@ public static final class NodeQueryResponse extends TransportResponse { |
215 | 219 | this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); |
216 | 220 | } |
217 | 221 |
|
218 | | - NodeQueryResponse( |
219 | | - QueryPhaseResultConsumer.MergeResult mergeResult, |
220 | | - Object[] results, |
221 | | - SearchPhaseController.TopDocsStats topDocsStats |
222 | | - ) { |
223 | | - this.results = results; |
224 | | - for (Object result : results) { |
225 | | - if (result instanceof QuerySearchResult r) { |
226 | | - r.incRef(); |
227 | | - } |
228 | | - } |
229 | | - this.mergeResult = mergeResult; |
230 | | - this.topDocsStats = topDocsStats; |
231 | | - assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results); |
232 | | - } |
233 | | - |
234 | 222 | // public for tests |
235 | 223 | public Object[] getResults() { |
236 | 224 | return results; |
237 | 225 | } |
238 | 226 |
|
239 | 227 | @Override |
240 | 228 | public void writeTo(StreamOutput out) throws IOException { |
241 | | - out.writeArray((o, v) -> { |
242 | | - if (v instanceof Exception e) { |
243 | | - o.writeBoolean(false); |
244 | | - o.writeException(e); |
| 229 | + out.writeVInt(results.length); |
| 230 | + for (Object result : results) { |
| 231 | + if (result instanceof Exception e) { |
| 232 | + writePerShardException(out, e); |
245 | 233 | } else { |
246 | | - o.writeBoolean(true); |
247 | | - assert v instanceof QuerySearchResult : v; |
248 | | - ((QuerySearchResult) v).writeTo(o); |
| 234 | + writePerShardResult(out, (QuerySearchResult) result); |
249 | 235 | } |
250 | | - }, results); |
251 | | - mergeResult.writeTo(out); |
252 | | - topDocsStats.writeTo(out); |
| 236 | + } |
| 237 | + writeMergeResult(out, mergeResult, topDocsStats); |
253 | 238 | } |
254 | 239 |
|
255 | 240 | @Override |
@@ -280,6 +265,25 @@ public boolean decRef() { |
280 | 265 | } |
281 | 266 | return false; |
282 | 267 | } |
| 268 | + |
| 269 | + private static void writeMergeResult( |
| 270 | + StreamOutput out, |
| 271 | + QueryPhaseResultConsumer.MergeResult mergeResult, |
| 272 | + SearchPhaseController.TopDocsStats topDocsStats |
| 273 | + ) throws IOException { |
| 274 | + mergeResult.writeTo(out); |
| 275 | + topDocsStats.writeTo(out); |
| 276 | + } |
| 277 | + |
| 278 | + private static void writePerShardException(StreamOutput o, Exception e) throws IOException { |
| 279 | + o.writeBoolean(false); |
| 280 | + o.writeException(e); |
| 281 | + } |
| 282 | + |
| 283 | + private static void writePerShardResult(StreamOutput out, SearchPhaseResult result) throws IOException { |
| 284 | + out.writeBoolean(true); |
| 285 | + result.writeTo(out); |
| 286 | + } |
283 | 287 | } |
284 | 288 |
|
285 | 289 | /** |
@@ -552,7 +556,7 @@ static void registerNodeSearchAction( |
552 | 556 | ) { |
553 | 557 | var transportService = searchTransportService.transportService(); |
554 | 558 | var threadPool = transportService.getThreadPool(); |
555 | | - final Dependencies dependencies = new Dependencies(searchService, threadPool.executor(ThreadPool.Names.SEARCH)); |
| 559 | + final Dependencies dependencies = new Dependencies(searchService, transportService, threadPool.executor(ThreadPool.Names.SEARCH)); |
556 | 560 | // Even though not all searches run on the search pool, we use the search pool size as the upper limit of shards to execute in |
557 | 561 | // parallel to keep the implementation simple instead of working out the exact pool(s) a query will use up-front. |
558 | 562 | final int searchPoolMax = threadPool.info(ThreadPool.Names.SEARCH).getMax(); |
@@ -715,7 +719,7 @@ public void onFailure(Exception e) { |
715 | 719 | } |
716 | 720 | } |
717 | 721 |
|
718 | | - private record Dependencies(SearchService searchService, Executor executor) {} |
| 722 | + private record Dependencies(SearchService searchService, TransportService transportService, Executor executor) {} |
719 | 723 |
|
720 | 724 | private static final class QueryPerNodeState { |
721 | 725 |
|
@@ -760,6 +764,8 @@ void onShardDone() { |
760 | 764 | if (countDown.countDown() == false) { |
761 | 765 | return; |
762 | 766 | } |
| 767 | + RecyclerBytesStreamOutput out = null; |
| 768 | + boolean success = false; |
763 | 769 | var channelListener = new ChannelActionListener<>(channel); |
764 | 770 | try (queryPhaseResultConsumer) { |
765 | 771 | var failure = queryPhaseResultConsumer.failure.get(); |
@@ -788,33 +794,46 @@ void onShardDone() { |
788 | 794 | relevantShardIndices.set(localIndex); |
789 | 795 | } |
790 | 796 | } |
791 | | - final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; |
792 | | - for (int i = 0; i < results.length; i++) { |
793 | | - var result = queryPhaseResultConsumer.results.get(i); |
794 | | - if (result == null) { |
795 | | - results[i] = failures.get(i); |
796 | | - } else { |
797 | | - // free context id and remove it from the result right away in case we don't need it anymore |
798 | | - if (result instanceof QuerySearchResult q |
799 | | - && q.getContextId() != null |
800 | | - && relevantShardIndices.get(q.getShardIndex()) == false |
801 | | - && q.hasSuggestHits() == false |
802 | | - && q.getRankShardResult() == null |
803 | | - && searchRequest.searchRequest.scroll() == null |
804 | | - && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) { |
805 | | - if (dependencies.searchService.freeReaderContext(q.getContextId())) { |
806 | | - q.clearContextId(); |
807 | | - } |
| 797 | + final int resultCount = queryPhaseResultConsumer.getNumShards(); |
| 798 | + out = dependencies.transportService.newNetworkBytesStream(); |
| 799 | + out.setTransportVersion(channel.getVersion()); |
| 800 | + try { |
| 801 | + out.writeVInt(resultCount); |
| 802 | + for (int i = 0; i < resultCount; i++) { |
| 803 | + var result = queryPhaseResultConsumer.results.get(i); |
| 804 | + if (result == null) { |
| 805 | + NodeQueryResponse.writePerShardException(out, failures.remove(i)); |
| 806 | + } else { |
| 807 | + // free context id and remove it from the result right away in case we don't need it anymore |
| 808 | + maybeFreeContext(result, relevantShardIndices); |
| 809 | + NodeQueryResponse.writePerShardResult(out, result); |
808 | 810 | } |
809 | | - results[i] = result; |
810 | 811 | } |
811 | | - assert results[i] != null; |
| 812 | + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); |
| 813 | + success = true; |
| 814 | + } catch (IOException e) { |
| 815 | + handleMergeFailure(e, channelListener); |
| 816 | + return; |
812 | 817 | } |
| 818 | + } finally { |
| 819 | + if (success == false && out != null) { |
| 820 | + out.close(); |
| 821 | + } |
| 822 | + } |
| 823 | + ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out))); |
| 824 | + } |
813 | 825 |
|
814 | | - ActionListener.respondAndRelease( |
815 | | - channelListener, |
816 | | - new NodeQueryResponse(mergeResult, results, queryPhaseResultConsumer.topDocsStats) |
817 | | - ); |
| 826 | + private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) { |
| 827 | + if (result instanceof QuerySearchResult q |
| 828 | + && q.getContextId() != null |
| 829 | + && relevantShardIndices.get(q.getShardIndex()) == false |
| 830 | + && q.hasSuggestHits() == false |
| 831 | + && q.getRankShardResult() == null |
| 832 | + && searchRequest.searchRequest.scroll() == null |
| 833 | + && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) { |
| 834 | + if (dependencies.searchService.freeReaderContext(q.getContextId())) { |
| 835 | + q.clearContextId(); |
| 836 | + } |
818 | 837 | } |
819 | 838 | } |
820 | 839 |
|
|
0 commit comments