1212import org .apache .logging .log4j .Level ;
1313import org .apache .logging .log4j .LogManager ;
1414import org .apache .logging .log4j .Logger ;
15- import org .apache .lucene .store .AlreadyClosedException ;
1615import org .apache .lucene .util .BytesRef ;
1716import org .elasticsearch .TransportVersion ;
1817import org .elasticsearch .TransportVersions ;
1918import org .elasticsearch .action .ActionListener ;
2019import org .elasticsearch .cluster .node .DiscoveryNode ;
20+ import org .elasticsearch .common .Strings ;
2121import org .elasticsearch .common .bytes .BytesReference ;
22+ import org .elasticsearch .common .bytes .CompositeBytesReference ;
23+ import org .elasticsearch .common .bytes .ReleasableBytesReference ;
24+ import org .elasticsearch .common .compress .CompressorFactory ;
25+ import org .elasticsearch .common .io .stream .OutputStreamStreamOutput ;
2226import org .elasticsearch .common .io .stream .RecyclerBytesStreamOutput ;
27+ import org .elasticsearch .common .io .stream .StreamOutput ;
28+ import org .elasticsearch .common .io .stream .Writeable ;
2329import org .elasticsearch .common .network .CloseableChannel ;
2430import org .elasticsearch .common .network .HandlingTimeTracker ;
2531import org .elasticsearch .common .recycler .Recycler ;
2632import org .elasticsearch .common .transport .NetworkExceptionHelper ;
33+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
2734import org .elasticsearch .core .Nullable ;
35+ import org .elasticsearch .core .RefCounted ;
2836import org .elasticsearch .core .Releasable ;
2937import org .elasticsearch .core .Releasables ;
38+ import org .elasticsearch .core .Streams ;
3039import org .elasticsearch .core .TimeValue ;
3140import org .elasticsearch .threadpool .ThreadPool ;
3241
3342import java .io .IOException ;
43+ import java .util .function .Supplier ;
3444
3545import static org .elasticsearch .core .Strings .format ;
3646
37- final class OutboundHandler {
47+ public final class OutboundHandler {
3848
3949 private static final Logger logger = LogManager .getLogger (OutboundHandler .class );
4050
@@ -83,7 +93,7 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
8393 * thread.
8494 */
8595 void sendBytes (TcpChannel channel , BytesReference bytes , ActionListener <Void > listener ) {
86- internalSend (channel , bytes , null , listener );
96+ internalSend (channel , bytes , () -> "raw bytes" , listener );
8797 }
8898
8999 /**
@@ -102,26 +112,17 @@ void sendRequest(
102112 final boolean isHandshake
103113 ) throws IOException , TransportException {
104114 assert assertValidTransportVersion (transportVersion );
105- final OutboundMessage .Request message = new OutboundMessage .Request (
106- threadPool .getThreadContext (),
107- request ,
108- transportVersion ,
115+ sendMessage (
116+ channel ,
109117 action ,
118+ request ,
110119 requestId ,
111120 isHandshake ,
112- compressionScheme
121+ compressionScheme ,
122+ transportVersion ,
123+ ResponseStatsConsumer .NONE ,
124+ () -> messageListener .onRequestSent (node , requestId , action , request , options )
113125 );
114- if (request .tryIncRef () == false ) {
115- assert false : "request [" + request + "] has been released already" ;
116- throw new AlreadyClosedException ("request [" + request + "] has been released already" );
117- }
118- sendMessage (channel , message , ResponseStatsConsumer .NONE , () -> {
119- try {
120- messageListener .onRequestSent (node , requestId , action , request , options );
121- } finally {
122- request .decRef ();
123- }
124- });
125126 }
126127
127128 /**
@@ -141,23 +142,19 @@ void sendResponse(
141142 final ResponseStatsConsumer responseStatsConsumer
142143 ) {
143144 assert assertValidTransportVersion (transportVersion );
144- OutboundMessage .Response message = new OutboundMessage .Response (
145- threadPool .getThreadContext (),
146- response ,
147- transportVersion ,
148- requestId ,
149- isHandshake ,
150- compressionScheme
151- );
152- response .mustIncRef ();
145+ assert response .hasReferences ();
153146 try {
154- sendMessage (channel , message , responseStatsConsumer , () -> {
155- try {
156- messageListener .onResponseSent (requestId , action );
157- } finally {
158- response .decRef ();
159- }
160- });
147+ sendMessage (
148+ channel ,
149+ null ,
150+ response ,
151+ requestId ,
152+ isHandshake ,
153+ compressionScheme ,
154+ transportVersion ,
155+ responseStatsConsumer ,
156+ () -> messageListener .onResponseSent (requestId , action )
157+ );
161158 } catch (Exception ex ) {
162159 if (isHandshake ) {
163160 logger .error (
@@ -187,16 +184,19 @@ void sendErrorResponse(
187184 final Exception error
188185 ) {
189186 assert assertValidTransportVersion (transportVersion );
190- OutboundMessage .Response message = new OutboundMessage .Response (
191- threadPool .getThreadContext (),
192- new RemoteTransportException (nodeName , channel .getLocalAddress (), action , error ),
193- transportVersion ,
194- requestId ,
195- false ,
196- null
197- );
187+ var msg = new RemoteTransportException (nodeName , channel .getLocalAddress (), action , error );
198188 try {
199- sendMessage (channel , message , responseStatsConsumer , () -> messageListener .onResponseSent (requestId , action , error ));
189+ sendMessage (
190+ channel ,
191+ null ,
192+ msg ,
193+ requestId ,
194+ false ,
195+ null ,
196+ transportVersion ,
197+ responseStatsConsumer ,
198+ () -> messageListener .onResponseSent (requestId , action , error )
199+ );
200200 } catch (Exception sendException ) {
201201 sendException .addSuppressed (error );
202202 logger .error (() -> format ("Failed to send error response on channel [%s], closing channel" , channel ), sendException );
@@ -206,42 +206,157 @@ void sendErrorResponse(
206206
207207 private void sendMessage (
208208 TcpChannel channel ,
209- OutboundMessage networkMessage ,
209+ @ Nullable String requestAction ,
210+ Writeable writeable ,
211+ long requestId ,
212+ boolean isHandshake ,
213+ Compression .Scheme compressionScheme ,
214+ TransportVersion version ,
210215 ResponseStatsConsumer responseStatsConsumer ,
211216 Releasable onAfter
212217 ) throws IOException {
213- final RecyclerBytesStreamOutput byteStreamOutput ;
214- boolean bufferSuccess = false ;
215- try {
216- byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
217- bufferSuccess = true ;
218- } finally {
219- if (bufferSuccess == false ) {
220- Releasables .closeExpectNoException (onAfter );
221- }
222- }
223- final Releasable release = Releasables .wrap (byteStreamOutput , onAfter );
218+ compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme ;
224219 final BytesReference message ;
225220 boolean serializeSuccess = false ;
221+ final boolean isError = writeable instanceof RemoteTransportException ;
222+ final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
226223 try {
227- message = networkMessage .serialize (byteStreamOutput );
224+ message = serialize (
225+ requestAction ,
226+ requestId ,
227+ isHandshake ,
228+ version ,
229+ isError ,
230+ compressionScheme ,
231+ writeable ,
232+ threadPool .getThreadContext (),
233+ byteStreamOutput
234+ );
228235 serializeSuccess = true ;
229236 } catch (Exception e ) {
230- logger .warn (() -> "failed to serialize outbound message [" + networkMessage + "]" , e );
237+ logger .warn (() -> "failed to serialize outbound message [" + writeable + "]" , e );
231238 throw e ;
232239 } finally {
233240 if (serializeSuccess == false ) {
234- release .close ();
241+ Releasables .close (byteStreamOutput , onAfter );
235242 }
236243 }
237244 responseStatsConsumer .addResponseStats (message .length ());
238- internalSend (channel , message , networkMessage , ActionListener .running (release ::close ));
245+ final var responseType = writeable .getClass ();
246+ final boolean compress = compressionScheme != null ;
247+ internalSend (
248+ channel ,
249+ message ,
250+ requestAction == null
251+ ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
252+ : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}" ,
253+ ActionListener .releasing (
254+ message instanceof ReleasableBytesReference r
255+ ? Releasables .wrap (byteStreamOutput , onAfter , r )
256+ : Releasables .wrap (byteStreamOutput , onAfter )
257+ )
258+ );
259+ }
260+
261+ // public for tests
262+ public static BytesReference serialize (
263+ @ Nullable String requestAction ,
264+ long requestId ,
265+ boolean isHandshake ,
266+ TransportVersion version ,
267+ boolean isError ,
268+ Compression .Scheme compressionScheme ,
269+ Writeable writeable ,
270+ ThreadContext threadContext ,
271+ RecyclerBytesStreamOutput byteStreamOutput
272+ ) throws IOException {
273+ assert byteStreamOutput .position () == 0 ;
274+ byteStreamOutput .setTransportVersion (version );
275+ byteStreamOutput .skip (TcpHeader .HEADER_SIZE );
276+ threadContext .writeTo (byteStreamOutput );
277+ if (requestAction != null ) {
278+ if (version .before (TransportVersions .V_8_0_0 )) {
279+ // empty features array
280+ byteStreamOutput .writeStringArray (Strings .EMPTY_ARRAY );
281+ }
282+ byteStreamOutput .writeString (requestAction );
283+ }
284+
285+ final int variableHeaderLength = Math .toIntExact (byteStreamOutput .position () - TcpHeader .HEADER_SIZE );
286+ BytesReference message = serializeMessageBody (writeable , compressionScheme , version , byteStreamOutput );
287+ byte status = 0 ;
288+ if (requestAction == null ) {
289+ status = TransportStatus .setResponse (status );
290+ }
291+ if (isHandshake ) {
292+ status = TransportStatus .setHandshake (status );
293+ }
294+ if (isError ) {
295+ status = TransportStatus .setError (status );
296+ }
297+ if (compressionScheme != null ) {
298+ status = TransportStatus .setCompress (status );
299+ }
300+ byteStreamOutput .seek (0 );
301+ TcpHeader .writeHeader (byteStreamOutput , requestId , status , version , message .length () - TcpHeader .HEADER_SIZE , variableHeaderLength );
302+ return message ;
303+ }
304+
305+ private static BytesReference serializeMessageBody (
306+ Writeable writeable ,
307+ Compression .Scheme compressionScheme ,
308+ TransportVersion version ,
309+ RecyclerBytesStreamOutput byteStreamOutput
310+ ) throws IOException {
311+ // The compressible bytes stream will not close the underlying bytes stream
312+ final StreamOutput stream = compressionScheme != null ? wrapCompressed (compressionScheme , byteStreamOutput ) : byteStreamOutput ;
313+ final ReleasableBytesReference zeroCopyBuffer ;
314+ try {
315+ stream .setTransportVersion (version );
316+ if (writeable instanceof BytesTransportRequest bRequest ) {
317+ bRequest .writeThin (stream );
318+ zeroCopyBuffer = bRequest .bytes ;
319+ } else if (writeable instanceof RemoteTransportException remoteTransportException ) {
320+ stream .writeException (remoteTransportException );
321+ zeroCopyBuffer = ReleasableBytesReference .empty ();
322+ } else {
323+ writeable .writeTo (stream );
324+ zeroCopyBuffer = ReleasableBytesReference .empty ();
325+ }
326+ } finally {
327+ // We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
328+ // are written.
329+ if (compressionScheme != null ) {
330+ stream .close ();
331+ }
332+ }
333+ final BytesReference msg = byteStreamOutput .bytes ();
334+ if (zeroCopyBuffer .length () == 0 ) {
335+ return msg ;
336+ }
337+ zeroCopyBuffer .mustIncRef ();
338+ return new ReleasableBytesReference (CompositeBytesReference .of (msg , zeroCopyBuffer ), (RefCounted ) zeroCopyBuffer );
339+ }
340+
341+ // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
342+ // resources and write EOS marker bytes but must not yet release the bytes themselves
343+ private static StreamOutput wrapCompressed (Compression .Scheme compressionScheme , RecyclerBytesStreamOutput bytesStream )
344+ throws IOException {
345+ if (compressionScheme == Compression .Scheme .DEFLATE ) {
346+ return new OutputStreamStreamOutput (
347+ CompressorFactory .COMPRESSOR .threadLocalOutputStream (org .elasticsearch .core .Streams .noCloseStream (bytesStream ))
348+ );
349+ } else if (compressionScheme == Compression .Scheme .LZ4 ) {
350+ return new OutputStreamStreamOutput (Compression .Scheme .lz4OutputStream (Streams .noCloseStream (bytesStream )));
351+ } else {
352+ throw new IllegalArgumentException ("Invalid compression scheme: " + compressionScheme );
353+ }
239354 }
240355
241356 private void internalSend (
242357 TcpChannel channel ,
243358 BytesReference reference ,
244- @ Nullable OutboundMessage message ,
359+ Supplier < String > messageDescription ,
245360 ActionListener <Void > listener
246361 ) {
247362 final long startTime = threadPool .rawRelativeTimeInMillis ();
@@ -281,7 +396,7 @@ private void maybeLogSlowMessage(boolean success) {
281396 logger .warn (
282397 "sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "
283398 + "threshold of [{}ms] with success [{}]" ,
284- message ,
399+ messageDescription . get () ,
285400 messageSize ,
286401 channel ,
287402 took ,
0 commit comments