1717import org .elasticsearch .TransportVersions ;
1818import org .elasticsearch .action .ActionListener ;
1919import org .elasticsearch .cluster .node .DiscoveryNode ;
20+ import org .elasticsearch .common .Strings ;
2021import org .elasticsearch .common .bytes .BytesReference ;
22+ import org .elasticsearch .common .bytes .CompositeBytesReference ;
2123import org .elasticsearch .common .bytes .ReleasableBytesReference ;
24+ import org .elasticsearch .common .compress .CompressorFactory ;
25+ import org .elasticsearch .common .io .stream .OutputStreamStreamOutput ;
2226import org .elasticsearch .common .io .stream .RecyclerBytesStreamOutput ;
27+ import org .elasticsearch .common .io .stream .StreamOutput ;
28+ import org .elasticsearch .common .io .stream .Writeable ;
2329import org .elasticsearch .common .network .CloseableChannel ;
2430import org .elasticsearch .common .network .HandlingTimeTracker ;
2531import org .elasticsearch .common .recycler .Recycler ;
2632import org .elasticsearch .common .transport .NetworkExceptionHelper ;
33+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
2734import org .elasticsearch .core .Nullable ;
35+ import org .elasticsearch .core .RefCounted ;
2836import org .elasticsearch .core .Releasable ;
2937import org .elasticsearch .core .Releasables ;
38+ import org .elasticsearch .core .Streams ;
3039import org .elasticsearch .core .TimeValue ;
3140import org .elasticsearch .core .UpdateForV10 ;
3241import org .elasticsearch .threadpool .ThreadPool ;
3342
3443import java .io .IOException ;
44+ import java .util .function .Supplier ;
3545
3646import static org .elasticsearch .core .Strings .format ;
3747
38- final class OutboundHandler {
48+ public final class OutboundHandler {
3949
4050 private static final Logger logger = LogManager .getLogger (OutboundHandler .class );
4151
@@ -85,7 +95,7 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
8595 * thread.
8696 */
8797 void sendBytes (TcpChannel channel , BytesReference bytes , ActionListener <Void > listener ) {
88- internalSend (channel , bytes , null , listener );
98+ internalSend (channel , bytes , () -> "raw bytes" , listener );
8999 }
90100
91101 /**
@@ -104,18 +114,14 @@ void sendRequest(
104114 final boolean isHandshake
105115 ) throws IOException , TransportException {
106116 assert assertValidTransportVersion (transportVersion );
107- final OutboundMessage .Request message = new OutboundMessage .Request (
108- threadPool .getThreadContext (),
109- request ,
110- transportVersion ,
117+ sendMessage (
118+ channel ,
111119 action ,
120+ request ,
112121 requestId ,
113122 isHandshake ,
114- compressionScheme
115- );
116- sendMessage (
117- channel ,
118- message ,
123+ compressionScheme ,
124+ transportVersion ,
119125 ResponseStatsConsumer .NONE ,
120126 () -> messageListener .onRequestSent (node , requestId , action , request , options )
121127 );
@@ -138,17 +144,19 @@ void sendResponse(
138144 final ResponseStatsConsumer responseStatsConsumer
139145 ) {
140146 assert assertValidTransportVersion (transportVersion );
141- OutboundMessage .Response message = new OutboundMessage .Response (
142- threadPool .getThreadContext (),
143- response ,
144- transportVersion ,
145- requestId ,
146- isHandshake ,
147- compressionScheme
148- );
149147 assert response .hasReferences ();
150148 try {
151- sendMessage (channel , message , responseStatsConsumer , () -> messageListener .onResponseSent (requestId , action ));
149+ sendMessage (
150+ channel ,
151+ null ,
152+ response ,
153+ requestId ,
154+ isHandshake ,
155+ compressionScheme ,
156+ transportVersion ,
157+ responseStatsConsumer ,
158+ () -> messageListener .onResponseSent (requestId , action )
159+ );
152160 } catch (Exception ex ) {
153161 if (isHandshake ) {
154162 logger .error (
@@ -178,16 +186,19 @@ void sendErrorResponse(
178186 final Exception error
179187 ) {
180188 assert assertValidTransportVersion (transportVersion );
181- OutboundMessage .Response message = new OutboundMessage .Response (
182- threadPool .getThreadContext (),
183- new RemoteTransportException (nodeName , channel .getLocalAddress (), action , error ),
184- transportVersion ,
185- requestId ,
186- false ,
187- null
188- );
189+ var msg = new RemoteTransportException (nodeName , channel .getLocalAddress (), action , error );
189190 try {
190- sendMessage (channel , message , responseStatsConsumer , () -> messageListener .onResponseSent (requestId , action , error ));
191+ sendMessage (
192+ channel ,
193+ null ,
194+ msg ,
195+ requestId ,
196+ false ,
197+ null ,
198+ transportVersion ,
199+ responseStatsConsumer ,
200+ () -> messageListener .onResponseSent (requestId , action , error )
201+ );
191202 } catch (Exception sendException ) {
192203 sendException .addSuppressed (error );
193204 logger .error (() -> format ("Failed to send error response on channel [%s], closing channel" , channel ), sendException );
@@ -197,38 +208,50 @@ void sendErrorResponse(
197208
198209 private void sendMessage (
199210 TcpChannel channel ,
200- OutboundMessage networkMessage ,
211+ @ Nullable String requestAction ,
212+ Writeable writeable ,
213+ long requestId ,
214+ boolean isHandshake ,
215+ Compression .Scheme compressionScheme ,
216+ TransportVersion version ,
201217 ResponseStatsConsumer responseStatsConsumer ,
202218 Releasable onAfter
203219 ) throws IOException {
204- final RecyclerBytesStreamOutput byteStreamOutput ;
205- boolean bufferSuccess = false ;
206- try {
207- byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
208- bufferSuccess = true ;
209- } finally {
210- if (bufferSuccess == false ) {
211- Releasables .closeExpectNoException (onAfter );
212- }
213- }
220+ compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme ;
214221 final BytesReference message ;
215222 boolean serializeSuccess = false ;
223+ final boolean isError = writeable instanceof RemoteTransportException ;
224+ final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
216225 try {
217- message = networkMessage .serialize (byteStreamOutput );
226+ message = serialize (
227+ requestAction ,
228+ requestId ,
229+ isHandshake ,
230+ version ,
231+ isError ,
232+ compressionScheme ,
233+ writeable ,
234+ threadPool .getThreadContext (),
235+ byteStreamOutput
236+ );
218237 serializeSuccess = true ;
219238 } catch (Exception e ) {
220- logger .warn (() -> "failed to serialize outbound message [" + networkMessage + "]" , e );
239+ logger .warn (() -> "failed to serialize outbound message [" + writeable + "]" , e );
221240 throw e ;
222241 } finally {
223242 if (serializeSuccess == false ) {
224243 Releasables .close (byteStreamOutput , onAfter );
225244 }
226245 }
227246 responseStatsConsumer .addResponseStats (message .length ());
247+ final var responseType = writeable .getClass ();
248+ final boolean compress = compressionScheme != null ;
228249 internalSend (
229250 channel ,
230251 message ,
231- networkMessage ,
252+ requestAction == null
253+ ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
254+ : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}" ,
232255 ActionListener .releasing (
233256 message instanceof ReleasableBytesReference r
234257 ? Releasables .wrap (byteStreamOutput , onAfter , r )
@@ -237,10 +260,105 @@ private void sendMessage(
237260 );
238261 }
239262
263+ // public for tests
264+ public static BytesReference serialize (
265+ @ Nullable String requestAction ,
266+ long requestId ,
267+ boolean isHandshake ,
268+ TransportVersion version ,
269+ boolean isError ,
270+ Compression .Scheme compressionScheme ,
271+ Writeable writeable ,
272+ ThreadContext threadContext ,
273+ RecyclerBytesStreamOutput byteStreamOutput
274+ ) throws IOException {
275+ assert byteStreamOutput .position () == 0 ;
276+ byteStreamOutput .setTransportVersion (version );
277+ byteStreamOutput .skip (TcpHeader .HEADER_SIZE );
278+ threadContext .writeTo (byteStreamOutput );
279+ if (requestAction != null ) {
280+ if (version .before (TransportVersions .V_8_0_0 )) {
281+ // empty features array
282+ byteStreamOutput .writeStringArray (Strings .EMPTY_ARRAY );
283+ }
284+ byteStreamOutput .writeString (requestAction );
285+ }
286+
287+ final int variableHeaderLength = Math .toIntExact (byteStreamOutput .position () - TcpHeader .HEADER_SIZE );
288+ BytesReference message = serializeMessageBody (writeable , compressionScheme , version , byteStreamOutput );
289+ byte status = 0 ;
290+ if (requestAction == null ) {
291+ status = TransportStatus .setResponse (status );
292+ }
293+ if (isHandshake ) {
294+ status = TransportStatus .setHandshake (status );
295+ }
296+ if (isError ) {
297+ status = TransportStatus .setError (status );
298+ }
299+ if (compressionScheme != null ) {
300+ status = TransportStatus .setCompress (status );
301+ }
302+ byteStreamOutput .seek (0 );
303+ TcpHeader .writeHeader (byteStreamOutput , requestId , status , version , message .length () - TcpHeader .HEADER_SIZE , variableHeaderLength );
304+ return message ;
305+ }
306+
307+ private static BytesReference serializeMessageBody (
308+ Writeable writeable ,
309+ Compression .Scheme compressionScheme ,
310+ TransportVersion version ,
311+ RecyclerBytesStreamOutput byteStreamOutput
312+ ) throws IOException {
313+ // The compressible bytes stream will not close the underlying bytes stream
314+ final StreamOutput stream = compressionScheme != null ? wrapCompressed (compressionScheme , byteStreamOutput ) : byteStreamOutput ;
315+ final ReleasableBytesReference zeroCopyBuffer ;
316+ try {
317+ stream .setTransportVersion (version );
318+ if (writeable instanceof BytesTransportRequest bRequest ) {
319+ bRequest .writeThin (stream );
320+ zeroCopyBuffer = bRequest .bytes ;
321+ } else if (writeable instanceof RemoteTransportException remoteTransportException ) {
322+ stream .writeException (remoteTransportException );
323+ zeroCopyBuffer = ReleasableBytesReference .empty ();
324+ } else {
325+ writeable .writeTo (stream );
326+ zeroCopyBuffer = ReleasableBytesReference .empty ();
327+ }
328+ } finally {
329+ // We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
330+ // are written.
331+ if (compressionScheme != null ) {
332+ stream .close ();
333+ }
334+ }
335+ final BytesReference msg = byteStreamOutput .bytes ();
336+ if (zeroCopyBuffer .length () == 0 ) {
337+ return msg ;
338+ }
339+ zeroCopyBuffer .mustIncRef ();
340+ return new ReleasableBytesReference (CompositeBytesReference .of (msg , zeroCopyBuffer ), (RefCounted ) zeroCopyBuffer );
341+ }
342+
343+ // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
344+ // resources and write EOS marker bytes but must not yet release the bytes themselves
345+ private static StreamOutput wrapCompressed (Compression .Scheme compressionScheme , RecyclerBytesStreamOutput bytesStream )
346+ throws IOException {
347+ if (compressionScheme == Compression .Scheme .DEFLATE ) {
348+ return new OutputStreamStreamOutput (
349+ CompressorFactory .COMPRESSOR .threadLocalOutputStream (org .elasticsearch .core .Streams .noCloseStream (bytesStream ))
350+ );
351+ } else if (compressionScheme == Compression .Scheme .LZ4 ) {
352+ return new OutputStreamStreamOutput (Compression .Scheme .lz4OutputStream (Streams .noCloseStream (bytesStream )));
353+ } else {
354+ throw new IllegalArgumentException ("Invalid compression scheme: " + compressionScheme );
355+ }
356+ }
357+
240358 private void internalSend (
241359 TcpChannel channel ,
242360 BytesReference reference ,
243- @ Nullable OutboundMessage message ,
361+ Supplier < String > messageDescription ,
244362 ActionListener <Void > listener
245363 ) {
246364 final long startTime = threadPool .rawRelativeTimeInMillis ();
@@ -280,7 +398,7 @@ private void maybeLogSlowMessage(boolean success) {
280398 logger .warn (
281399 "sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "
282400 + "threshold of [{}ms] with success [{}]" ,
283- message ,
401+ messageDescription . get () ,
284402 messageSize ,
285403 channel ,
286404 took ,
0 commit comments