Skip to content

Commit a8c9d2f

Browse files
Stop retaining transport messages after serialization (#122981)
We should not retain messages after serialization. We currently, have no 0-copy logic in place outside of some special cases that are handled separately. As a result of not doing zero-copy logic, there is no need to retain messages until they have been flushed to the wire in full. This change reduces the lifetime of things like `SearchHit` significantly, reducing the data-node side impact of fetching large documents/large aggregations/top-hits to name a few. The fact that this change technically passes released request + response instances to the `messageListener` seems irrelevant since the listeners are only used in tests anyway. If anything, we should look to refactor this logic to avoid holding on to the request/response objects needlessly.
1 parent 091ea9a commit a8c9d2f

File tree

2 files changed

+27
-28
lines changed

2 files changed

+27
-28
lines changed

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
import org.apache.logging.log4j.Level;
1313
import org.apache.logging.log4j.LogManager;
1414
import org.apache.logging.log4j.Logger;
15-
import org.apache.lucene.store.AlreadyClosedException;
1615
import org.apache.lucene.util.BytesRef;
1716
import org.elasticsearch.TransportVersion;
1817
import org.elasticsearch.TransportVersions;
1918
import org.elasticsearch.action.ActionListener;
2019
import org.elasticsearch.cluster.node.DiscoveryNode;
2120
import org.elasticsearch.common.bytes.BytesReference;
21+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2222
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2323
import org.elasticsearch.common.network.CloseableChannel;
2424
import org.elasticsearch.common.network.HandlingTimeTracker;
@@ -113,17 +113,12 @@ void sendRequest(
113113
isHandshake,
114114
compressionScheme
115115
);
116-
if (request.tryIncRef() == false) {
117-
assert false : "request [" + request + "] has been released already";
118-
throw new AlreadyClosedException("request [" + request + "] has been released already");
119-
}
120-
sendMessage(channel, message, ResponseStatsConsumer.NONE, () -> {
121-
try {
122-
messageListener.onRequestSent(node, requestId, action, request, options);
123-
} finally {
124-
request.decRef();
125-
}
126-
});
116+
sendMessage(
117+
channel,
118+
message,
119+
ResponseStatsConsumer.NONE,
120+
() -> messageListener.onRequestSent(node, requestId, action, request, options)
121+
);
127122
}
128123

129124
/**
@@ -151,15 +146,9 @@ void sendResponse(
151146
isHandshake,
152147
compressionScheme
153148
);
154-
response.mustIncRef();
149+
assert response.hasReferences();
155150
try {
156-
sendMessage(channel, message, responseStatsConsumer, () -> {
157-
try {
158-
messageListener.onResponseSent(requestId, action, response);
159-
} finally {
160-
response.decRef();
161-
}
162-
});
151+
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, response));
163152
} catch (Exception ex) {
164153
if (isHandshake) {
165154
logger.error(
@@ -222,7 +211,6 @@ private void sendMessage(
222211
Releasables.closeExpectNoException(onAfter);
223212
}
224213
}
225-
final Releasable release = Releasables.wrap(byteStreamOutput, onAfter);
226214
final BytesReference message;
227215
boolean serializeSuccess = false;
228216
try {
@@ -233,11 +221,20 @@ private void sendMessage(
233221
throw e;
234222
} finally {
235223
if (serializeSuccess == false) {
236-
release.close();
224+
Releasables.close(byteStreamOutput, onAfter);
237225
}
238226
}
239227
responseStatsConsumer.addResponseStats(message.length());
240-
internalSend(channel, message, networkMessage, ActionListener.running(release::close));
228+
internalSend(
229+
channel,
230+
message,
231+
networkMessage,
232+
ActionListener.releasing(
233+
message instanceof ReleasableBytesReference r
234+
? Releasables.wrap(byteStreamOutput, onAfter, r)
235+
: Releasables.wrap(byteStreamOutput, onAfter)
236+
)
237+
);
241238
}
242239

243240
private void internalSend(

server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@
1111
import org.elasticsearch.TransportVersion;
1212
import org.elasticsearch.TransportVersions;
1313
import org.elasticsearch.common.Strings;
14-
import org.elasticsearch.common.bytes.BytesArray;
1514
import org.elasticsearch.common.bytes.BytesReference;
1615
import org.elasticsearch.common.bytes.CompositeBytesReference;
16+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1717
import org.elasticsearch.common.compress.CompressorFactory;
1818
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
1919
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.common.io.stream.Writeable;
2222
import org.elasticsearch.common.util.concurrent.ThreadContext;
23+
import org.elasticsearch.core.RefCounted;
2324
import org.elasticsearch.core.Streams;
2425

2526
import java.io.IOException;
@@ -56,7 +57,7 @@ BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOExcepti
5657

5758
final boolean compress = TransportStatus.isCompress(status);
5859
final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream;
59-
final BytesReference zeroCopyBuffer;
60+
final ReleasableBytesReference zeroCopyBuffer;
6061
try {
6162
stream.setTransportVersion(version);
6263
if (variableHeaderLength == -1) {
@@ -67,10 +68,10 @@ BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOExcepti
6768
zeroCopyBuffer = bRequest.bytes;
6869
} else if (message instanceof RemoteTransportException) {
6970
stream.writeException((RemoteTransportException) message);
70-
zeroCopyBuffer = BytesArray.EMPTY;
71+
zeroCopyBuffer = ReleasableBytesReference.empty();
7172
} else {
7273
message.writeTo(stream);
73-
zeroCopyBuffer = BytesArray.EMPTY;
74+
zeroCopyBuffer = ReleasableBytesReference.empty();
7475
}
7576
} finally {
7677
// We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
@@ -83,7 +84,8 @@ BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOExcepti
8384
if (zeroCopyBuffer.length() == 0) {
8485
reference = message;
8586
} else {
86-
reference = CompositeBytesReference.of(message, zeroCopyBuffer);
87+
zeroCopyBuffer.mustIncRef();
88+
reference = new ReleasableBytesReference(CompositeBytesReference.of(message, zeroCopyBuffer), (RefCounted) zeroCopyBuffer);
8789
}
8890

8991
bytesStream.seek(0);

0 commit comments

Comments
 (0)