From 26b9464960ee24039f9603b4dfc5afb6ecf2447c Mon Sep 17 00:00:00 2001 From: danielhuang Date: Thu, 31 Jul 2025 15:45:22 +0800 Subject: [PATCH 1/5] Fix transport logger trace level log NPE cause node-left issue. --- .../java/org/elasticsearch/transport/TransportLogger.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java index 6cad97acab7ba..a375146f96ece 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -30,7 +30,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) { try { String logMessage = format(channel, message, "READ"); logger.trace(logMessage); - } catch (IOException e) { + } catch (Exception e) { logger.warn("an exception occurred formatting a READ trace message", e); } } @@ -41,7 +41,7 @@ static void logInboundMessage(TcpChannel channel, InboundMessage message) { try { String logMessage = format(channel, message, "READ"); logger.trace(logMessage); - } catch (IOException e) { + } catch (Exception e) { logger.warn("an exception occurred formatting a READ trace message", e); } } @@ -57,7 +57,7 @@ static void logOutboundMessage(TcpChannel channel, BytesReference message) { BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE); String logMessage = format(channel, withoutHeader, "WRITE"); logger.trace(logMessage); - } catch (IOException e) { + } catch (Exception e) { logger.warn("an exception occurred formatting a WRITE trace message", e); } } From 557c6f0b85472d2e2c42ad8351d57a4983dee9f0 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Thu, 31 Jul 2025 16:31:27 +0800 Subject: [PATCH 2/5] Remove unsed `openOrGetStreamInput()` and CompressorFactory` --- .../transport/TransportLogger.java | 52 +++++-------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java index a375146f96ece..17338289a15fa 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -13,7 +13,6 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.IOUtils; @@ -111,55 +110,32 @@ private static String format(TcpChannel channel, BytesReference message, String return sb.toString(); } - private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException { + private static String format(TcpChannel channel, InboundMessage message, String event) { final StringBuilder sb = new StringBuilder(); sb.append(channel); if (message.isPing()) { sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B'); } else { - boolean success = false; Header header = message.getHeader(); int networkMessageSize = header.getNetworkMessageSize(); int messageLengthWithHeader = HEADER_SIZE + networkMessageSize; - StreamInput streamInput = message.openOrGetStreamInput(); - try { - final long requestId = header.getRequestId(); - final boolean isRequest = header.isRequest(); - final String type = isRequest ? "request" : "response"; - final String version = header.getVersion().toString(); - sb.append(" [length: ").append(messageLengthWithHeader); - sb.append(", request id: ").append(requestId); - sb.append(", type: ").append(type); - sb.append(", version: ").append(version); + final long requestId = header.getRequestId(); + final boolean isRequest = header.isRequest(); + final String type = isRequest ? "request" : "response"; + final String version = header.getVersion().toString(); + sb.append(" [length: ").append(messageLengthWithHeader); + sb.append(", request id: ").append(requestId); + sb.append(", type: ").append(type); + sb.append(", version: ").append(version); - // TODO: Maybe Fix for BWC - if (header.needsToReadVariableHeader() == false && isRequest) { - sb.append(", action: ").append(header.getActionName()); - } - sb.append(']'); - sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); - success = true; - } finally { - if (success) { - IOUtils.close(streamInput); - } else { - IOUtils.closeWhileHandlingException(streamInput); - } + // TODO: Maybe Fix for BWC + if (header.needsToReadVariableHeader() == false && isRequest) { + sb.append(", action: ").append(header.getActionName()); } + sb.append(']'); + sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); } return sb.toString(); } - - private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException { - if (TransportStatus.isCompress(status) && streamInput.available() > 0) { - try { - return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput); - } catch (IllegalArgumentException e) { - throw new IllegalStateException("stream marked as compressed, but is missing deflate header"); - } - } else { - return streamInput; - } - } } From a3e9bc7e58d7ebb64d04e2b8e779841176686868 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Thu, 31 Jul 2025 20:06:12 +0800 Subject: [PATCH 3/5] Add test case --- .../transport/TransportLoggerTests.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index a361f08009955..4fb841c74f124 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -66,6 +66,34 @@ public void testLoggingHandler() throws IOException { } } + public void testLoggingHandlerWithExceptionMessage() { + final String readPattern = ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + " READ: \\d+B"; + + final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation( + "spatial stats request", + TransportLogger.class.getCanonicalName(), + Level.TRACE, + readPattern + ); + + InboundMessage inboundMessage = new InboundMessage(new Header( + 0, + 0, + TransportStatus.setRequest((byte) 0), + TransportVersion.current() + ), new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats")); + + try (var mockLog = MockLog.capture(TransportLogger.class)) { + mockLog.addExpectation(readExpectation); + TransportLogger.logInboundMessage(mock(TcpChannel.class), inboundMessage); + mockLog.assertAllExpectationsMatched(); + } + } + private BytesReference buildRequest() throws IOException { BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null); From d1824fb075b7f11817241c0e8eaf1dfdd7ad10ff Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 31 Jul 2025 13:13:06 +0100 Subject: [PATCH 4/5] Add changelog --- docs/changelog/132243.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/132243.yaml diff --git a/docs/changelog/132243.yaml b/docs/changelog/132243.yaml new file mode 100644 index 0000000000000..9b83f92532c57 --- /dev/null +++ b/docs/changelog/132243.yaml @@ -0,0 +1,5 @@ +pr: 132243 +summary: Fix `NullPointerException` in transport trace logger +area: Network +type: bug +issues: [] From c7ce931c4e54d1c5f2830d4a964198a1447a4f7e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 15 Aug 2025 10:11:44 +0000 Subject: [PATCH 5/5] [CI] Auto commit changes from spotless --- .../transport/TransportLoggerTests.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index 4fb841c74f124..55bd3c805c7c8 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -67,11 +67,7 @@ public void testLoggingHandler() throws IOException { } public void testLoggingHandlerWithExceptionMessage() { - final String readPattern = ".*\\[length: \\d+" - + ", request id: \\d+" - + ", type: request" - + ", version: .*" - + " READ: \\d+B"; + final String readPattern = ".*\\[length: \\d+" + ", request id: \\d+" + ", type: request" + ", version: .*" + " READ: \\d+B"; final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation( "spatial stats request", @@ -80,12 +76,10 @@ public void testLoggingHandlerWithExceptionMessage() { readPattern ); - InboundMessage inboundMessage = new InboundMessage(new Header( - 0, - 0, - TransportStatus.setRequest((byte) 0), - TransportVersion.current() - ), new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats")); + InboundMessage inboundMessage = new InboundMessage( + new Header(0, 0, TransportStatus.setRequest((byte) 0), TransportVersion.current()), + new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats") + ); try (var mockLog = MockLog.capture(TransportLogger.class)) { mockLog.addExpectation(readExpectation);