@@ -86,6 +86,11 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
8686 this .slowLogThresholdMs = slowLogThreshold .getMillis ();
8787 }
8888
89+ /**
90+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
91+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
92+ * the message themselves otherwise
93+ */
8994 void inboundMessage (TcpChannel channel , InboundMessage message ) throws Exception {
9095 final long startTime = threadPool .rawRelativeTimeInMillis ();
9196 channel .getChannelStats ().markAccessed (startTime );
@@ -101,6 +106,11 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception
101106 // Empty stream constant to avoid instantiating a new stream for empty messages.
102107 private static final StreamInput EMPTY_STREAM_INPUT = new ByteBufferStreamInput (ByteBuffer .wrap (BytesRef .EMPTY_BYTES ));
103108
109+ /**
110+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
111+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
112+ * the message themselves otherwise
113+ */
104114 private void messageReceived (TcpChannel channel , InboundMessage message , long startTime ) throws IOException {
105115 final InetSocketAddress remoteAddress = channel .getRemoteAddress ();
106116 final Header header = message .getHeader ();
@@ -136,6 +146,11 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
136146 }
137147 }
138148
149+ /**
150+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
151+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
152+ * the message themselves otherwise
153+ */
139154 private void executeResponseHandler (
140155 InboundMessage message ,
141156 TransportResponseHandler <?> responseHandler ,
@@ -221,6 +236,11 @@ private void verifyResponseReadFully(Header header, TransportResponseHandler<?>
221236 }
222237 }
223238
239+ /**
240+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
241+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
242+ * the message themselves otherwise
243+ */
224244 private <T extends TransportRequest > void handleRequest (TcpChannel channel , InboundMessage message ) throws IOException {
225245 final Header header = message .getHeader ();
226246 if (header .isHandshake ()) {
@@ -333,6 +353,9 @@ public void onAfter() {
333353 }
334354 }
335355
356+ /**
357+ * @param message guaranteed to get closed by this method
358+ */
336359 private void handleHandshakeRequest (TcpChannel channel , InboundMessage message ) throws IOException {
337360 var header = message .getHeader ();
338361 assert header .actionName .equals (TransportHandshaker .HANDSHAKE_ACTION_NAME );
@@ -373,27 +396,30 @@ private static void sendErrorResponse(String actionName, TransportChannel transp
373396 }
374397 }
375398
399+ /**
400+ * @param message guaranteed to get closed by this method
401+ */
376402 private <T extends TransportResponse > void handleResponse (
377403 InetSocketAddress remoteAddress ,
378404 final StreamInput stream ,
379405 final TransportResponseHandler <T > handler ,
380- final InboundMessage inboundMessage
406+ final InboundMessage message
381407 ) {
382408 final var executor = handler .executor ();
383409 if (executor == EsExecutors .DIRECT_EXECUTOR_SERVICE ) {
384410 // no need to provide a buffer release here, we never escape the buffer when handling directly
385- doHandleResponse (handler , remoteAddress , stream , inboundMessage );
411+ doHandleResponse (handler , remoteAddress , stream , message );
386412 } else {
387413 // release buffer once we deserialize the message, but have a fail-safe in #onAfter below in case that didn't work out
388414 executor .execute (new ForkingResponseHandlerRunnable (handler , null ) {
389415 @ Override
390416 protected void doRun () {
391- doHandleResponse (handler , remoteAddress , stream , inboundMessage );
417+ doHandleResponse (handler , remoteAddress , stream , message );
392418 }
393419
394420 @ Override
395421 public void onAfter () {
396- inboundMessage .close ();
422+ message .close ();
397423 }
398424 });
399425 }
@@ -404,7 +430,7 @@ public void onAfter() {
404430 * @param handler response handler
405431 * @param remoteAddress remote address that the message was sent from
406432 * @param stream bytes stream for reading the message
407- * @param inboundMessage inbound message
433+ * @param inboundMessage inbound message, guaranteed to get closed by this method
408434 * @param <T> response message type
409435 */
410436 private <T extends TransportResponse > void doHandleResponse (
@@ -436,6 +462,9 @@ private <T extends TransportResponse> void doHandleResponse(
436462 }
437463 }
438464
465+ /**
466+ * @param message guaranteed to get closed by this method
467+ */
439468 private void handlerResponseError (StreamInput stream , InboundMessage message , final TransportResponseHandler <?> handler ) {
440469 Exception error ;
441470 try (message ) {
0 commit comments