Skip to content

Commit 9596cdb

Browse files
original-brownbearomricohenn
authored andcommitted
Stop retaining transport responses past serialization (elastic#125163)
Remove the `OutboundMessage` class that needlessly holds on to the the response instances after they are not needed any longer. Inlining the logic should save considerably heap under pressure and enabled further optimisations.
1 parent a18a73b commit 9596cdb

File tree

9 files changed

+370
-490
lines changed

9 files changed

+370
-490
lines changed

server/src/main/java/org/elasticsearch/transport/NetworkMessage.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 163 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,35 @@
1717
import org.elasticsearch.TransportVersions;
1818
import org.elasticsearch.action.ActionListener;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.common.Strings;
2021
import org.elasticsearch.common.bytes.BytesReference;
22+
import org.elasticsearch.common.bytes.CompositeBytesReference;
2123
import org.elasticsearch.common.bytes.ReleasableBytesReference;
24+
import org.elasticsearch.common.compress.CompressorFactory;
25+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
2226
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
27+
import org.elasticsearch.common.io.stream.StreamOutput;
28+
import org.elasticsearch.common.io.stream.Writeable;
2329
import org.elasticsearch.common.network.CloseableChannel;
2430
import org.elasticsearch.common.network.HandlingTimeTracker;
2531
import org.elasticsearch.common.recycler.Recycler;
2632
import org.elasticsearch.common.transport.NetworkExceptionHelper;
33+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2734
import org.elasticsearch.core.Nullable;
35+
import org.elasticsearch.core.RefCounted;
2836
import org.elasticsearch.core.Releasable;
2937
import org.elasticsearch.core.Releasables;
38+
import org.elasticsearch.core.Streams;
3039
import org.elasticsearch.core.TimeValue;
3140
import org.elasticsearch.core.UpdateForV10;
3241
import org.elasticsearch.threadpool.ThreadPool;
3342

3443
import java.io.IOException;
44+
import java.util.function.Supplier;
3545

3646
import static org.elasticsearch.core.Strings.format;
3747

38-
final class OutboundHandler {
48+
public final class OutboundHandler {
3949

4050
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
4151

@@ -85,7 +95,7 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
8595
* thread.
8696
*/
8797
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
88-
internalSend(channel, bytes, null, listener);
98+
internalSend(channel, bytes, () -> "raw bytes", listener);
8999
}
90100

91101
/**
@@ -104,18 +114,14 @@ void sendRequest(
104114
final boolean isHandshake
105115
) throws IOException, TransportException {
106116
assert assertValidTransportVersion(transportVersion);
107-
final OutboundMessage.Request message = new OutboundMessage.Request(
108-
threadPool.getThreadContext(),
109-
request,
110-
transportVersion,
117+
sendMessage(
118+
channel,
111119
action,
120+
request,
112121
requestId,
113122
isHandshake,
114-
compressionScheme
115-
);
116-
sendMessage(
117-
channel,
118-
message,
123+
compressionScheme,
124+
transportVersion,
119125
ResponseStatsConsumer.NONE,
120126
() -> messageListener.onRequestSent(node, requestId, action, request, options)
121127
);
@@ -138,17 +144,19 @@ void sendResponse(
138144
final ResponseStatsConsumer responseStatsConsumer
139145
) {
140146
assert assertValidTransportVersion(transportVersion);
141-
OutboundMessage.Response message = new OutboundMessage.Response(
142-
threadPool.getThreadContext(),
143-
response,
144-
transportVersion,
145-
requestId,
146-
isHandshake,
147-
compressionScheme
148-
);
149147
assert response.hasReferences();
150148
try {
151-
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action));
149+
sendMessage(
150+
channel,
151+
null,
152+
response,
153+
requestId,
154+
isHandshake,
155+
compressionScheme,
156+
transportVersion,
157+
responseStatsConsumer,
158+
() -> messageListener.onResponseSent(requestId, action)
159+
);
152160
} catch (Exception ex) {
153161
if (isHandshake) {
154162
logger.error(
@@ -178,16 +186,19 @@ void sendErrorResponse(
178186
final Exception error
179187
) {
180188
assert assertValidTransportVersion(transportVersion);
181-
OutboundMessage.Response message = new OutboundMessage.Response(
182-
threadPool.getThreadContext(),
183-
new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error),
184-
transportVersion,
185-
requestId,
186-
false,
187-
null
188-
);
189+
var msg = new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error);
189190
try {
190-
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, error));
191+
sendMessage(
192+
channel,
193+
null,
194+
msg,
195+
requestId,
196+
false,
197+
null,
198+
transportVersion,
199+
responseStatsConsumer,
200+
() -> messageListener.onResponseSent(requestId, action, error)
201+
);
191202
} catch (Exception sendException) {
192203
sendException.addSuppressed(error);
193204
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
@@ -197,38 +208,50 @@ void sendErrorResponse(
197208

198209
private void sendMessage(
199210
TcpChannel channel,
200-
OutboundMessage networkMessage,
211+
@Nullable String requestAction,
212+
Writeable writeable,
213+
long requestId,
214+
boolean isHandshake,
215+
Compression.Scheme compressionScheme,
216+
TransportVersion version,
201217
ResponseStatsConsumer responseStatsConsumer,
202218
Releasable onAfter
203219
) throws IOException {
204-
final RecyclerBytesStreamOutput byteStreamOutput;
205-
boolean bufferSuccess = false;
206-
try {
207-
byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
208-
bufferSuccess = true;
209-
} finally {
210-
if (bufferSuccess == false) {
211-
Releasables.closeExpectNoException(onAfter);
212-
}
213-
}
220+
compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme;
214221
final BytesReference message;
215222
boolean serializeSuccess = false;
223+
final boolean isError = writeable instanceof RemoteTransportException;
224+
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
216225
try {
217-
message = networkMessage.serialize(byteStreamOutput);
226+
message = serialize(
227+
requestAction,
228+
requestId,
229+
isHandshake,
230+
version,
231+
isError,
232+
compressionScheme,
233+
writeable,
234+
threadPool.getThreadContext(),
235+
byteStreamOutput
236+
);
218237
serializeSuccess = true;
219238
} catch (Exception e) {
220-
logger.warn(() -> "failed to serialize outbound message [" + networkMessage + "]", e);
239+
logger.warn(() -> "failed to serialize outbound message [" + writeable + "]", e);
221240
throw e;
222241
} finally {
223242
if (serializeSuccess == false) {
224243
Releasables.close(byteStreamOutput, onAfter);
225244
}
226245
}
227246
responseStatsConsumer.addResponseStats(message.length());
247+
final var responseType = writeable.getClass();
248+
final boolean compress = compressionScheme != null;
228249
internalSend(
229250
channel,
230251
message,
231-
networkMessage,
252+
requestAction == null
253+
? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
254+
: () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}",
232255
ActionListener.releasing(
233256
message instanceof ReleasableBytesReference r
234257
? Releasables.wrap(byteStreamOutput, onAfter, r)
@@ -237,10 +260,105 @@ private void sendMessage(
237260
);
238261
}
239262

263+
// public for tests
264+
public static BytesReference serialize(
265+
@Nullable String requestAction,
266+
long requestId,
267+
boolean isHandshake,
268+
TransportVersion version,
269+
boolean isError,
270+
Compression.Scheme compressionScheme,
271+
Writeable writeable,
272+
ThreadContext threadContext,
273+
RecyclerBytesStreamOutput byteStreamOutput
274+
) throws IOException {
275+
assert byteStreamOutput.position() == 0;
276+
byteStreamOutput.setTransportVersion(version);
277+
byteStreamOutput.skip(TcpHeader.HEADER_SIZE);
278+
threadContext.writeTo(byteStreamOutput);
279+
if (requestAction != null) {
280+
if (version.before(TransportVersions.V_8_0_0)) {
281+
// empty features array
282+
byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY);
283+
}
284+
byteStreamOutput.writeString(requestAction);
285+
}
286+
287+
final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE);
288+
BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput);
289+
byte status = 0;
290+
if (requestAction == null) {
291+
status = TransportStatus.setResponse(status);
292+
}
293+
if (isHandshake) {
294+
status = TransportStatus.setHandshake(status);
295+
}
296+
if (isError) {
297+
status = TransportStatus.setError(status);
298+
}
299+
if (compressionScheme != null) {
300+
status = TransportStatus.setCompress(status);
301+
}
302+
byteStreamOutput.seek(0);
303+
TcpHeader.writeHeader(byteStreamOutput, requestId, status, version, message.length() - TcpHeader.HEADER_SIZE, variableHeaderLength);
304+
return message;
305+
}
306+
307+
private static BytesReference serializeMessageBody(
308+
Writeable writeable,
309+
Compression.Scheme compressionScheme,
310+
TransportVersion version,
311+
RecyclerBytesStreamOutput byteStreamOutput
312+
) throws IOException {
313+
// The compressible bytes stream will not close the underlying bytes stream
314+
final StreamOutput stream = compressionScheme != null ? wrapCompressed(compressionScheme, byteStreamOutput) : byteStreamOutput;
315+
final ReleasableBytesReference zeroCopyBuffer;
316+
try {
317+
stream.setTransportVersion(version);
318+
if (writeable instanceof BytesTransportRequest bRequest) {
319+
bRequest.writeThin(stream);
320+
zeroCopyBuffer = bRequest.bytes;
321+
} else if (writeable instanceof RemoteTransportException remoteTransportException) {
322+
stream.writeException(remoteTransportException);
323+
zeroCopyBuffer = ReleasableBytesReference.empty();
324+
} else {
325+
writeable.writeTo(stream);
326+
zeroCopyBuffer = ReleasableBytesReference.empty();
327+
}
328+
} finally {
329+
// We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
330+
// are written.
331+
if (compressionScheme != null) {
332+
stream.close();
333+
}
334+
}
335+
final BytesReference msg = byteStreamOutput.bytes();
336+
if (zeroCopyBuffer.length() == 0) {
337+
return msg;
338+
}
339+
zeroCopyBuffer.mustIncRef();
340+
return new ReleasableBytesReference(CompositeBytesReference.of(msg, zeroCopyBuffer), (RefCounted) zeroCopyBuffer);
341+
}
342+
343+
// compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
344+
// resources and write EOS marker bytes but must not yet release the bytes themselves
345+
private static StreamOutput wrapCompressed(Compression.Scheme compressionScheme, RecyclerBytesStreamOutput bytesStream)
346+
throws IOException {
347+
if (compressionScheme == Compression.Scheme.DEFLATE) {
348+
return new OutputStreamStreamOutput(
349+
CompressorFactory.COMPRESSOR.threadLocalOutputStream(org.elasticsearch.core.Streams.noCloseStream(bytesStream))
350+
);
351+
} else if (compressionScheme == Compression.Scheme.LZ4) {
352+
return new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(Streams.noCloseStream(bytesStream)));
353+
} else {
354+
throw new IllegalArgumentException("Invalid compression scheme: " + compressionScheme);
355+
}
356+
}
357+
240358
private void internalSend(
241359
TcpChannel channel,
242360
BytesReference reference,
243-
@Nullable OutboundMessage message,
361+
Supplier<String> messageDescription,
244362
ActionListener<Void> listener
245363
) {
246364
final long startTime = threadPool.rawRelativeTimeInMillis();
@@ -280,7 +398,7 @@ private void maybeLogSlowMessage(boolean success) {
280398
logger.warn(
281399
"sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "
282400
+ "threshold of [{}ms] with success [{}]",
283-
message,
401+
messageDescription.get(),
284402
messageSize,
285403
channel,
286404
took,

0 commit comments

Comments
 (0)