Skip to content

Commit 82ff913

Browse files
Introduce BytesTransportResponse to enable lower heap overhead network responses (#127112)
This introduces a way of sending transport responses without materializing an actual response object. This gets a little more complicated for responses that can both go to the local direct response channel as well as to the wire, but for the per-node query response that isn't sent locally we can use it to save creating the response object. The version here only saves a small amount of heap yet (though it makes the GC's life a little easier) but this can be taken further step by step easily by also e.g. streaming aggregation results into the output stream instead of ever materializing them. Another candidate that could win big from this change is the fetch phase where we could effectively halve the peak heap use by serializing directly.
1 parent 6bb79f2 commit 82ff913

File tree

5 files changed

+163
-61
lines changed

5 files changed

+163
-61
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 70 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.elasticsearch.action.support.IndicesOptions;
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2627
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
28+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2729
import org.elasticsearch.common.io.stream.StreamInput;
2830
import org.elasticsearch.common.io.stream.StreamOutput;
2931
import org.elasticsearch.common.io.stream.Writeable;
@@ -50,6 +52,7 @@
5052
import org.elasticsearch.tasks.TaskId;
5153
import org.elasticsearch.threadpool.ThreadPool;
5254
import org.elasticsearch.transport.AbstractTransportRequest;
55+
import org.elasticsearch.transport.BytesTransportResponse;
5356
import org.elasticsearch.transport.LeakTracker;
5457
import org.elasticsearch.transport.SendRequestTransportException;
5558
import org.elasticsearch.transport.Transport;
@@ -58,6 +61,7 @@
5861
import org.elasticsearch.transport.TransportException;
5962
import org.elasticsearch.transport.TransportResponse;
6063
import org.elasticsearch.transport.TransportResponseHandler;
64+
import org.elasticsearch.transport.TransportService;
6165

6266
import java.io.IOException;
6367
import java.util.ArrayList;
@@ -215,41 +219,22 @@ public static final class NodeQueryResponse extends TransportResponse {
215219
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);
216220
}
217221

218-
NodeQueryResponse(
219-
QueryPhaseResultConsumer.MergeResult mergeResult,
220-
Object[] results,
221-
SearchPhaseController.TopDocsStats topDocsStats
222-
) {
223-
this.results = results;
224-
for (Object result : results) {
225-
if (result instanceof QuerySearchResult r) {
226-
r.incRef();
227-
}
228-
}
229-
this.mergeResult = mergeResult;
230-
this.topDocsStats = topDocsStats;
231-
assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results);
232-
}
233-
234222
// public for tests
235223
public Object[] getResults() {
236224
return results;
237225
}
238226

239227
@Override
240228
public void writeTo(StreamOutput out) throws IOException {
241-
out.writeArray((o, v) -> {
242-
if (v instanceof Exception e) {
243-
o.writeBoolean(false);
244-
o.writeException(e);
229+
out.writeVInt(results.length);
230+
for (Object result : results) {
231+
if (result instanceof Exception e) {
232+
writePerShardException(out, e);
245233
} else {
246-
o.writeBoolean(true);
247-
assert v instanceof QuerySearchResult : v;
248-
((QuerySearchResult) v).writeTo(o);
234+
writePerShardResult(out, (QuerySearchResult) result);
249235
}
250-
}, results);
251-
mergeResult.writeTo(out);
252-
topDocsStats.writeTo(out);
236+
}
237+
writeMergeResult(out, mergeResult, topDocsStats);
253238
}
254239

255240
@Override
@@ -280,6 +265,25 @@ public boolean decRef() {
280265
}
281266
return false;
282267
}
268+
269+
private static void writeMergeResult(
270+
StreamOutput out,
271+
QueryPhaseResultConsumer.MergeResult mergeResult,
272+
SearchPhaseController.TopDocsStats topDocsStats
273+
) throws IOException {
274+
mergeResult.writeTo(out);
275+
topDocsStats.writeTo(out);
276+
}
277+
278+
private static void writePerShardException(StreamOutput o, Exception e) throws IOException {
279+
o.writeBoolean(false);
280+
o.writeException(e);
281+
}
282+
283+
private static void writePerShardResult(StreamOutput out, SearchPhaseResult result) throws IOException {
284+
out.writeBoolean(true);
285+
result.writeTo(out);
286+
}
283287
}
284288

285289
/**
@@ -552,7 +556,7 @@ static void registerNodeSearchAction(
552556
) {
553557
var transportService = searchTransportService.transportService();
554558
var threadPool = transportService.getThreadPool();
555-
final Dependencies dependencies = new Dependencies(searchService, threadPool.executor(ThreadPool.Names.SEARCH));
559+
final Dependencies dependencies = new Dependencies(searchService, transportService, threadPool.executor(ThreadPool.Names.SEARCH));
556560
// Even though not all searches run on the search pool, we use the search pool size as the upper limit of shards to execute in
557561
// parallel to keep the implementation simple instead of working out the exact pool(s) a query will use up-front.
558562
final int searchPoolMax = threadPool.info(ThreadPool.Names.SEARCH).getMax();
@@ -715,7 +719,7 @@ public void onFailure(Exception e) {
715719
}
716720
}
717721

718-
private record Dependencies(SearchService searchService, Executor executor) {}
722+
private record Dependencies(SearchService searchService, TransportService transportService, Executor executor) {}
719723

720724
private static final class QueryPerNodeState {
721725

@@ -760,6 +764,8 @@ void onShardDone() {
760764
if (countDown.countDown() == false) {
761765
return;
762766
}
767+
RecyclerBytesStreamOutput out = null;
768+
boolean success = false;
763769
var channelListener = new ChannelActionListener<>(channel);
764770
try (queryPhaseResultConsumer) {
765771
var failure = queryPhaseResultConsumer.failure.get();
@@ -788,33 +794,46 @@ void onShardDone() {
788794
relevantShardIndices.set(localIndex);
789795
}
790796
}
791-
final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()];
792-
for (int i = 0; i < results.length; i++) {
793-
var result = queryPhaseResultConsumer.results.get(i);
794-
if (result == null) {
795-
results[i] = failures.get(i);
796-
} else {
797-
// free context id and remove it from the result right away in case we don't need it anymore
798-
if (result instanceof QuerySearchResult q
799-
&& q.getContextId() != null
800-
&& relevantShardIndices.get(q.getShardIndex()) == false
801-
&& q.hasSuggestHits() == false
802-
&& q.getRankShardResult() == null
803-
&& searchRequest.searchRequest.scroll() == null
804-
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
805-
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
806-
q.clearContextId();
807-
}
797+
final int resultCount = queryPhaseResultConsumer.getNumShards();
798+
out = dependencies.transportService.newNetworkBytesStream();
799+
out.setTransportVersion(channel.getVersion());
800+
try {
801+
out.writeVInt(resultCount);
802+
for (int i = 0; i < resultCount; i++) {
803+
var result = queryPhaseResultConsumer.results.get(i);
804+
if (result == null) {
805+
NodeQueryResponse.writePerShardException(out, failures.remove(i));
806+
} else {
807+
// free context id and remove it from the result right away in case we don't need it anymore
808+
maybeFreeContext(result, relevantShardIndices);
809+
NodeQueryResponse.writePerShardResult(out, result);
808810
}
809-
results[i] = result;
810811
}
811-
assert results[i] != null;
812+
NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats);
813+
success = true;
814+
} catch (IOException e) {
815+
handleMergeFailure(e, channelListener);
816+
return;
812817
}
818+
} finally {
819+
if (success == false && out != null) {
820+
out.close();
821+
}
822+
}
823+
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out)));
824+
}
813825

814-
ActionListener.respondAndRelease(
815-
channelListener,
816-
new NodeQueryResponse(mergeResult, results, queryPhaseResultConsumer.topDocsStats)
817-
);
826+
private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) {
827+
if (result instanceof QuerySearchResult q
828+
&& q.getContextId() != null
829+
&& relevantShardIndices.get(q.getShardIndex()) == false
830+
&& q.hasSuggestHits() == false
831+
&& q.getRankShardResult() == null
832+
&& searchRequest.searchRequest.scroll() == null
833+
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
834+
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
835+
q.clearContextId();
836+
}
818837
}
819838
}
820839

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
15+
import java.io.IOException;
16+
17+
public interface BytesTransportMessage {
18+
19+
ReleasableBytesReference bytes();
20+
21+
/**
22+
* Writes the data in a "thin" manner, without the actual bytes, assumes
23+
* the actual bytes will be appended right after this content.
24+
*/
25+
void writeThin(StreamOutput out) throws IOException;
26+
}

server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.TransportVersion;
13-
import org.elasticsearch.common.bytes.BytesReference;
1413
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1514
import org.elasticsearch.common.io.stream.StreamInput;
1615
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -21,7 +20,7 @@
2120
* A specialized, bytes only request, that can potentially be optimized on the network
2221
* layer, specifically for the same large buffer send to several nodes.
2322
*/
24-
public class BytesTransportRequest extends AbstractTransportRequest {
23+
public class BytesTransportRequest extends AbstractTransportRequest implements BytesTransportMessage {
2524

2625
final ReleasableBytesReference bytes;
2726
private final TransportVersion version;
@@ -41,14 +40,12 @@ public TransportVersion version() {
4140
return this.version;
4241
}
4342

44-
public BytesReference bytes() {
43+
@Override
44+
public ReleasableBytesReference bytes() {
4545
return this.bytes;
4646
}
4747

48-
/**
49-
* Writes the data in a "thin" manner, without the actual bytes, assumes
50-
* the actual bytes will be appended right after this content.
51-
*/
48+
@Override
5249
public void writeThin(StreamOutput out) throws IOException {
5350
super.writeTo(out);
5451
out.writeVInt(bytes.length());
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
15+
import java.io.IOException;
16+
17+
/**
18+
* A specialized, bytes only response, that can potentially be optimized on the network layer.
19+
*/
20+
public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage {
21+
22+
private final ReleasableBytesReference bytes;
23+
24+
public BytesTransportResponse(ReleasableBytesReference bytes) {
25+
this.bytes = bytes;
26+
}
27+
28+
@Override
29+
public ReleasableBytesReference bytes() {
30+
return this.bytes;
31+
}
32+
33+
@Override
34+
public void writeThin(StreamOutput out) throws IOException {}
35+
36+
@Override
37+
public void writeTo(StreamOutput out) throws IOException {
38+
bytes.writeTo(out);
39+
}
40+
41+
@Override
42+
public void incRef() {
43+
bytes.incRef();
44+
}
45+
46+
@Override
47+
public boolean tryIncRef() {
48+
return bytes.tryIncRef();
49+
}
50+
51+
@Override
52+
public boolean decRef() {
53+
return bytes.decRef();
54+
}
55+
56+
@Override
57+
public boolean hasReferences() {
58+
return bytes.hasReferences();
59+
}
60+
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private void sendMessage(
227227
Releasable onAfter
228228
) throws IOException {
229229
assert action != null;
230-
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
230+
final var compressionScheme = writeable instanceof BytesTransportMessage ? null : possibleCompressionScheme;
231231
final BytesReference message;
232232
boolean serializeSuccess = false;
233233
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
@@ -334,11 +334,11 @@ private static BytesReference serializeMessageBody(
334334
final ReleasableBytesReference zeroCopyBuffer;
335335
try {
336336
stream.setTransportVersion(version);
337-
if (writeable instanceof BytesTransportRequest bRequest) {
337+
if (writeable instanceof BytesTransportMessage bRequest) {
338338
assert stream == byteStreamOutput;
339339
assert compressionScheme == null;
340340
bRequest.writeThin(stream);
341-
zeroCopyBuffer = bRequest.bytes;
341+
zeroCopyBuffer = bRequest.bytes();
342342
} else if (writeable instanceof RemoteTransportException remoteTransportException) {
343343
stream.writeException(remoteTransportException);
344344
zeroCopyBuffer = ReleasableBytesReference.empty();

0 commit comments

Comments
 (0)