Skip to content

Commit 61f51d5

Browse files
Dry up serialization
1 parent ebf398f commit 61f51d5

File tree

1 file changed

+44
-27
lines changed

1 file changed

+44
-27
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -226,18 +226,15 @@ public Object[] getResults() {
226226

227227
@Override
228228
public void writeTo(StreamOutput out) throws IOException {
229-
out.writeArray((o, v) -> {
230-
if (v instanceof Exception e) {
231-
o.writeBoolean(false);
232-
o.writeException(e);
229+
out.writeVInt(results.length);
230+
for (Object result : results) {
231+
if (result instanceof Exception e) {
232+
writePerShardException(out, e);
233233
} else {
234-
o.writeBoolean(true);
235-
assert v instanceof QuerySearchResult : v;
236-
((QuerySearchResult) v).writeTo(o);
234+
writePerShardResult(out, (QuerySearchResult) result);
237235
}
238-
}, results);
239-
mergeResult.writeTo(out);
240-
topDocsStats.writeTo(out);
236+
}
237+
writeMergeResult(out, mergeResult, topDocsStats);
241238
}
242239

243240
@Override
@@ -268,6 +265,25 @@ public boolean decRef() {
268265
}
269266
return false;
270267
}
268+
269+
private static void writeMergeResult(
270+
StreamOutput out,
271+
QueryPhaseResultConsumer.MergeResult mergeResult,
272+
SearchPhaseController.TopDocsStats topDocsStats
273+
) throws IOException {
274+
mergeResult.writeTo(out);
275+
topDocsStats.writeTo(out);
276+
}
277+
278+
private static void writePerShardException(StreamOutput o, Exception e) throws IOException {
279+
o.writeBoolean(false);
280+
o.writeException(e);
281+
}
282+
283+
private static void writePerShardResult(StreamOutput out, SearchPhaseResult result) throws IOException {
284+
out.writeBoolean(true);
285+
result.writeTo(out);
286+
}
271287
}
272288

273289
/**
@@ -786,27 +802,14 @@ void onShardDone() {
786802
for (int i = 0; i < resultCount; i++) {
787803
var result = queryPhaseResultConsumer.results.get(i);
788804
if (result == null) {
789-
out.writeBoolean(false);
790-
out.writeException(failures.remove(i));
805+
NodeQueryResponse.writePerShardException(out, failures.remove(i));
791806
} else {
792807
// free context id and remove it from the result right away in case we don't need it anymore
793-
if (result instanceof QuerySearchResult q
794-
&& q.getContextId() != null
795-
&& relevantShardIndices.get(q.getShardIndex()) == false
796-
&& q.hasSuggestHits() == false
797-
&& q.getRankShardResult() == null
798-
&& searchRequest.searchRequest.scroll() == null
799-
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
800-
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
801-
q.clearContextId();
802-
}
803-
}
804-
out.writeBoolean(true);
805-
result.writeTo(out);
808+
maybeFreeContext(result, relevantShardIndices);
809+
NodeQueryResponse.writePerShardResult(out, result);
806810
}
807811
}
808-
mergeResult.writeTo(out);
809-
queryPhaseResultConsumer.topDocsStats.writeTo(out);
812+
NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats);
810813
success = true;
811814
} catch (IOException e) {
812815
handleMergeFailure(e, channelListener);
@@ -820,6 +823,20 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
820823
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out)));
821824
}
822825

826+
private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) {
827+
if (result instanceof QuerySearchResult q
828+
&& q.getContextId() != null
829+
&& relevantShardIndices.get(q.getShardIndex()) == false
830+
&& q.hasSuggestHits() == false
831+
&& q.getRankShardResult() == null
832+
&& searchRequest.searchRequest.scroll() == null
833+
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
834+
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
835+
q.clearContextId();
836+
}
837+
}
838+
}
839+
823840
private void handleMergeFailure(Exception e, ChannelActionListener<TransportResponse> channelListener) {
824841
queryPhaseResultConsumer.getSuccessfulResults()
825842
.forEach(searchPhaseResult -> releaseLocalContext(dependencies.searchService, searchRequest, searchPhaseResult));

0 commit comments

Comments
 (0)