3131import org .elasticsearch .common .recycler .Recycler ;
3232import org .elasticsearch .common .transport .NetworkExceptionHelper ;
3333import org .elasticsearch .common .util .concurrent .ThreadContext ;
34- import org .elasticsearch .core .Nullable ;
3534import org .elasticsearch .core .RefCounted ;
3635import org .elasticsearch .core .Releasable ;
3736import org .elasticsearch .core .Releasables ;
@@ -116,6 +115,7 @@ void sendRequest(
116115 assert assertValidTransportVersion (transportVersion );
117116 sendMessage (
118117 channel ,
118+ MessageDirection .REQUEST ,
119119 action ,
120120 request ,
121121 requestId ,
@@ -148,7 +148,8 @@ void sendResponse(
148148 try {
149149 sendMessage (
150150 channel ,
151- null ,
151+ MessageDirection .RESPONSE ,
152+ action ,
152153 response ,
153154 requestId ,
154155 isHandshake ,
@@ -190,7 +191,8 @@ void sendErrorResponse(
190191 try {
191192 sendMessage (
192193 channel ,
193- null ,
194+ MessageDirection .RESPONSE_ERROR ,
195+ action ,
194196 msg ,
195197 requestId ,
196198 false ,
@@ -206,29 +208,36 @@ void sendErrorResponse(
206208 }
207209 }
208210
211+ public enum MessageDirection {
212+ REQUEST ,
213+ RESPONSE ,
214+ RESPONSE_ERROR
215+ }
216+
209217 private void sendMessage (
210218 TcpChannel channel ,
211- @ Nullable String requestAction ,
219+ MessageDirection messageDirection ,
220+ String action ,
212221 Writeable writeable ,
213222 long requestId ,
214223 boolean isHandshake ,
215- Compression .Scheme compressionScheme ,
224+ Compression .Scheme possibleCompressionScheme ,
216225 TransportVersion version ,
217226 ResponseStatsConsumer responseStatsConsumer ,
218227 Releasable onAfter
219228 ) throws IOException {
220- compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme ;
229+ assert action != null ;
230+ final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme ;
221231 final BytesReference message ;
222232 boolean serializeSuccess = false ;
223- final boolean isError = writeable instanceof RemoteTransportException ;
224233 final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
225234 try {
226235 message = serialize (
227- requestAction ,
236+ messageDirection ,
237+ action ,
228238 requestId ,
229239 isHandshake ,
230240 version ,
231- isError ,
232241 compressionScheme ,
233242 writeable ,
234243 threadPool .getThreadContext (),
@@ -244,14 +253,23 @@ private void sendMessage(
244253 }
245254 }
246255 responseStatsConsumer .addResponseStats (message .length ());
247- final var responseType = writeable .getClass ();
248- final boolean compress = compressionScheme != null ;
256+ final var messageType = writeable .getClass ();
249257 internalSend (
250258 channel ,
251259 message ,
252- requestAction == null
253- ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
254- : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}" ,
260+ () -> (messageDirection == MessageDirection .REQUEST ? "Request{" : "Response{" )
261+ + action
262+ + "}{id="
263+ + requestId
264+ + "}{err="
265+ + (messageDirection == MessageDirection .RESPONSE_ERROR )
266+ + "}{cs="
267+ + compressionScheme
268+ + "}{hs="
269+ + isHandshake
270+ + "}{t="
271+ + messageType
272+ + "}" ,
255273 ActionListener .releasing (
256274 message instanceof ReleasableBytesReference r
257275 ? Releasables .wrap (byteStreamOutput , onAfter , r )
@@ -262,38 +280,39 @@ private void sendMessage(
262280
263281 // public for tests
264282 public static BytesReference serialize (
265- @ Nullable String requestAction ,
283+ MessageDirection messageDirection ,
284+ String action ,
266285 long requestId ,
267286 boolean isHandshake ,
268287 TransportVersion version ,
269- boolean isError ,
270288 Compression .Scheme compressionScheme ,
271289 Writeable writeable ,
272290 ThreadContext threadContext ,
273291 RecyclerBytesStreamOutput byteStreamOutput
274292 ) throws IOException {
293+ assert action != null ;
275294 assert byteStreamOutput .position () == 0 ;
276295 byteStreamOutput .setTransportVersion (version );
277296 byteStreamOutput .skip (TcpHeader .HEADER_SIZE );
278297 threadContext .writeTo (byteStreamOutput );
279- if (requestAction != null ) {
298+ if (messageDirection == MessageDirection . REQUEST ) {
280299 if (version .before (TransportVersions .V_8_0_0 )) {
281300 // empty features array
282301 byteStreamOutput .writeStringArray (Strings .EMPTY_ARRAY );
283302 }
284- byteStreamOutput .writeString (requestAction );
303+ byteStreamOutput .writeString (action );
285304 }
286305
287306 final int variableHeaderLength = Math .toIntExact (byteStreamOutput .position () - TcpHeader .HEADER_SIZE );
288307 BytesReference message = serializeMessageBody (writeable , compressionScheme , version , byteStreamOutput );
289308 byte status = 0 ;
290- if (requestAction == null ) {
309+ if (messageDirection != MessageDirection . REQUEST ) {
291310 status = TransportStatus .setResponse (status );
292311 }
293312 if (isHandshake ) {
294313 status = TransportStatus .setHandshake (status );
295314 }
296- if (isError ) {
315+ if (messageDirection == MessageDirection . RESPONSE_ERROR ) {
297316 status = TransportStatus .setError (status );
298317 }
299318 if (compressionScheme != null ) {
@@ -316,6 +335,8 @@ private static BytesReference serializeMessageBody(
316335 try {
317336 stream .setTransportVersion (version );
318337 if (writeable instanceof BytesTransportRequest bRequest ) {
338+ assert stream == byteStreamOutput ;
339+ assert compressionScheme == null ;
319340 bRequest .writeThin (stream );
320341 zeroCopyBuffer = bRequest .bytes ;
321342 } else if (writeable instanceof RemoteTransportException remoteTransportException ) {
0 commit comments