Skip to content

Commit 87438ae

Browse files
Introduce BytesTransportResponse to enable zero-copy network writes
This introduces a way of sending transport responses without materializing an actual response object. This gets a little more complicated for responses that can both go to the local direct response channel as well as to the wire, but for the per-node query response that isn't sent locally we can use it to save creating the response object. The version here only saves a small amount of heap yet (though it makes the GC's life a little easier) but this can be taken further step by step easily by also e.g. streaming aggregation results into the output stream instead of ever materializing them. Another candidate that could win big from this change is the fetch phase where we could effectively halve the peak heap use by serializing directly.
1 parent bc0e1d0 commit 87438ae

File tree

5 files changed

+136
-51
lines changed

5 files changed

+136
-51
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.elasticsearch.action.support.IndicesOptions;
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2627
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
28+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2729
import org.elasticsearch.common.io.stream.StreamInput;
2830
import org.elasticsearch.common.io.stream.StreamOutput;
2931
import org.elasticsearch.common.io.stream.Writeable;
@@ -50,6 +52,7 @@
5052
import org.elasticsearch.tasks.TaskId;
5153
import org.elasticsearch.threadpool.ThreadPool;
5254
import org.elasticsearch.transport.AbstractTransportRequest;
55+
import org.elasticsearch.transport.BytesTransportResponse;
5356
import org.elasticsearch.transport.LeakTracker;
5457
import org.elasticsearch.transport.SendRequestTransportException;
5558
import org.elasticsearch.transport.Transport;
@@ -58,6 +61,7 @@
5861
import org.elasticsearch.transport.TransportException;
5962
import org.elasticsearch.transport.TransportResponse;
6063
import org.elasticsearch.transport.TransportResponseHandler;
64+
import org.elasticsearch.transport.TransportService;
6165

6266
import java.io.IOException;
6367
import java.util.ArrayList;
@@ -215,22 +219,6 @@ public static final class NodeQueryResponse extends TransportResponse {
215219
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);
216220
}
217221

218-
NodeQueryResponse(
219-
QueryPhaseResultConsumer.MergeResult mergeResult,
220-
Object[] results,
221-
SearchPhaseController.TopDocsStats topDocsStats
222-
) {
223-
this.results = results;
224-
for (Object result : results) {
225-
if (result instanceof QuerySearchResult r) {
226-
r.incRef();
227-
}
228-
}
229-
this.mergeResult = mergeResult;
230-
this.topDocsStats = topDocsStats;
231-
assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results);
232-
}
233-
234222
// public for tests
235223
public Object[] getResults() {
236224
return results;
@@ -552,7 +540,7 @@ static void registerNodeSearchAction(
552540
) {
553541
var transportService = searchTransportService.transportService();
554542
var threadPool = transportService.getThreadPool();
555-
final Dependencies dependencies = new Dependencies(searchService, threadPool.executor(ThreadPool.Names.SEARCH));
543+
final Dependencies dependencies = new Dependencies(searchService, transportService, threadPool.executor(ThreadPool.Names.SEARCH));
556544
// Even though not all searches run on the search pool, we use the search pool size as the upper limit of shards to execute in
557545
// parallel to keep the implementation simple instead of working out the exact pool(s) a query will use up-front.
558546
final int searchPoolMax = threadPool.info(ThreadPool.Names.SEARCH).getMax();
@@ -715,7 +703,7 @@ public void onFailure(Exception e) {
715703
}
716704
}
717705

718-
private record Dependencies(SearchService searchService, Executor executor) {}
706+
private record Dependencies(SearchService searchService, TransportService transportService, Executor executor) {}
719707

720708
private static final class QueryPerNodeState {
721709

@@ -760,6 +748,8 @@ void onShardDone() {
760748
if (countDown.countDown() == false) {
761749
return;
762750
}
751+
RecyclerBytesStreamOutput out = null;
752+
boolean success = false;
763753
var channelListener = new ChannelActionListener<>(channel);
764754
try (queryPhaseResultConsumer) {
765755
var failure = queryPhaseResultConsumer.failure.get();
@@ -788,34 +778,46 @@ void onShardDone() {
788778
relevantShardIndices.set(localIndex);
789779
}
790780
}
791-
final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()];
792-
for (int i = 0; i < results.length; i++) {
793-
var result = queryPhaseResultConsumer.results.get(i);
794-
if (result == null) {
795-
results[i] = failures.get(i);
796-
} else {
797-
// free context id and remove it from the result right away in case we don't need it anymore
798-
if (result instanceof QuerySearchResult q
799-
&& q.getContextId() != null
800-
&& relevantShardIndices.get(q.getShardIndex()) == false
801-
&& q.hasSuggestHits() == false
802-
&& q.getRankShardResult() == null
803-
&& searchRequest.searchRequest.scroll() == null
804-
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
805-
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
806-
q.clearContextId();
781+
final int resultCount = queryPhaseResultConsumer.getNumShards();
782+
out = dependencies.transportService.newNetworkBytesStream();
783+
out.setTransportVersion(channel.getVersion());
784+
try {
785+
out.writeVInt(resultCount);
786+
for (int i = 0; i < resultCount; i++) {
787+
var result = queryPhaseResultConsumer.results.get(i);
788+
if (result == null) {
789+
out.writeBoolean(false);
790+
out.writeException(failures.remove(i));
791+
} else {
792+
// free context id and remove it from the result right away in case we don't need it anymore
793+
if (result instanceof QuerySearchResult q
794+
&& q.getContextId() != null
795+
&& relevantShardIndices.get(q.getShardIndex()) == false
796+
&& q.hasSuggestHits() == false
797+
&& q.getRankShardResult() == null
798+
&& searchRequest.searchRequest.scroll() == null
799+
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
800+
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
801+
q.clearContextId();
802+
}
807803
}
804+
out.writeBoolean(true);
805+
result.writeTo(out);
808806
}
809-
results[i] = result;
810807
}
811-
assert results[i] != null;
808+
mergeResult.writeTo(out);
809+
queryPhaseResultConsumer.topDocsStats.writeTo(out);
810+
success = true;
811+
} catch (IOException e) {
812+
handleMergeFailure(e, channelListener);
813+
return;
814+
}
815+
} finally {
816+
if (success == false && out != null) {
817+
out.close();
812818
}
813-
814-
ActionListener.respondAndRelease(
815-
channelListener,
816-
new NodeQueryResponse(mergeResult, results, queryPhaseResultConsumer.topDocsStats)
817-
);
818819
}
820+
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out)));
819821
}
820822

821823
private void handleMergeFailure(Exception e, ChannelActionListener<TransportResponse> channelListener) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
15+
import java.io.IOException;
16+
17+
public interface BytesTransportMessage {
18+
19+
ReleasableBytesReference bytes();
20+
21+
/**
22+
* Writes the data in a "thin" manner, without the actual bytes, assumes
23+
* the actual bytes will be appended right after this content.
24+
*/
25+
void writeThin(StreamOutput out) throws IOException;
26+
}

server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.TransportVersion;
13-
import org.elasticsearch.common.bytes.BytesReference;
1413
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1514
import org.elasticsearch.common.io.stream.StreamInput;
1615
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -21,7 +20,7 @@
2120
* A specialized, bytes only request, that can potentially be optimized on the network
2221
* layer, specifically for the same large buffer send to several nodes.
2322
*/
24-
public class BytesTransportRequest extends AbstractTransportRequest {
23+
public class BytesTransportRequest extends AbstractTransportRequest implements BytesTransportMessage {
2524

2625
final ReleasableBytesReference bytes;
2726
private final TransportVersion version;
@@ -41,14 +40,12 @@ public TransportVersion version() {
4140
return this.version;
4241
}
4342

44-
public BytesReference bytes() {
43+
@Override
44+
public ReleasableBytesReference bytes() {
4545
return this.bytes;
4646
}
4747

48-
/**
49-
* Writes the data in a "thin" manner, without the actual bytes, assumes
50-
* the actual bytes will be appended right after this content.
51-
*/
48+
@Override
5249
public void writeThin(StreamOutput out) throws IOException {
5350
super.writeTo(out);
5451
out.writeVInt(bytes.length());
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
15+
import java.io.IOException;
16+
17+
/**
18+
* A specialized, bytes only response, that can potentially be optimized on the network layer.
19+
*/
20+
public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage {
21+
22+
private final ReleasableBytesReference bytes;
23+
24+
public BytesTransportResponse(ReleasableBytesReference bytes) {
25+
this.bytes = bytes;
26+
}
27+
28+
@Override
29+
public ReleasableBytesReference bytes() {
30+
return this.bytes;
31+
}
32+
33+
@Override
34+
public void writeThin(StreamOutput out) throws IOException {}
35+
36+
@Override
37+
public void writeTo(StreamOutput out) throws IOException {
38+
bytes.writeTo(out);
39+
}
40+
41+
@Override
42+
public void incRef() {
43+
bytes.incRef();
44+
}
45+
46+
@Override
47+
public boolean tryIncRef() {
48+
return bytes.tryIncRef();
49+
}
50+
51+
@Override
52+
public boolean decRef() {
53+
return bytes.decRef();
54+
}
55+
56+
@Override
57+
public boolean hasReferences() {
58+
return bytes.hasReferences();
59+
}
60+
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private void sendMessage(
227227
Releasable onAfter
228228
) throws IOException {
229229
assert action != null;
230-
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
230+
final var compressionScheme = writeable instanceof BytesTransportMessage ? null : possibleCompressionScheme;
231231
final BytesReference message;
232232
boolean serializeSuccess = false;
233233
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
@@ -334,11 +334,11 @@ private static BytesReference serializeMessageBody(
334334
final ReleasableBytesReference zeroCopyBuffer;
335335
try {
336336
stream.setTransportVersion(version);
337-
if (writeable instanceof BytesTransportRequest bRequest) {
337+
if (writeable instanceof BytesTransportMessage bRequest) {
338338
assert stream == byteStreamOutput;
339339
assert compressionScheme == null;
340340
bRequest.writeThin(stream);
341-
zeroCopyBuffer = bRequest.bytes;
341+
zeroCopyBuffer = bRequest.bytes();
342342
} else if (writeable instanceof RemoteTransportException remoteTransportException) {
343343
stream.writeException(remoteTransportException);
344344
zeroCopyBuffer = ReleasableBytesReference.empty();

0 commit comments

Comments
 (0)