Skip to content

Commit 7577951

Browse files
Fix NullPointerException in transport trace logger (#132243)
When trace-level logging is enabled, a node might disconnect from cluster due to an NPE that causes the transport connection closed between the data node and the master node. InboundMessages printed by `TransportLogger` might throw an NPE in the format function because `content` might be NULL if another node sends an abnormal exception response. Also there's no good reason to close the connection because of a logging exception, so with this commit we catch all exceptions (rather than just `IOException`)
1 parent e9f3239 commit 7577951

File tree

3 files changed

+44
-41
lines changed

3 files changed

+44
-41
lines changed

docs/changelog/132243.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132243
2+
summary: Fix `NullPointerException` in transport trace logger
3+
area: Network
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/transport/TransportLogger.java

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.TransportVersion;
1414
import org.elasticsearch.TransportVersions;
1515
import org.elasticsearch.common.bytes.BytesReference;
16-
import org.elasticsearch.common.compress.CompressorFactory;
1716
import org.elasticsearch.common.io.stream.StreamInput;
1817
import org.elasticsearch.common.util.concurrent.ThreadContext;
1918
import org.elasticsearch.core.IOUtils;
@@ -30,7 +29,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
3029
try {
3130
String logMessage = format(channel, message, "READ");
3231
logger.trace(logMessage);
33-
} catch (IOException e) {
32+
} catch (Exception e) {
3433
logger.warn("an exception occurred formatting a READ trace message", e);
3534
}
3635
}
@@ -41,7 +40,7 @@ static void logInboundMessage(TcpChannel channel, InboundMessage message) {
4140
try {
4241
String logMessage = format(channel, message, "READ");
4342
logger.trace(logMessage);
44-
} catch (IOException e) {
43+
} catch (Exception e) {
4544
logger.warn("an exception occurred formatting a READ trace message", e);
4645
}
4746
}
@@ -57,7 +56,7 @@ static void logOutboundMessage(TcpChannel channel, BytesReference message) {
5756
BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE);
5857
String logMessage = format(channel, withoutHeader, "WRITE");
5958
logger.trace(logMessage);
60-
} catch (IOException e) {
59+
} catch (Exception e) {
6160
logger.warn("an exception occurred formatting a WRITE trace message", e);
6261
}
6362
}
@@ -111,55 +110,32 @@ private static String format(TcpChannel channel, BytesReference message, String
111110
return sb.toString();
112111
}
113112

114-
private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
113+
private static String format(TcpChannel channel, InboundMessage message, String event) {
115114
final StringBuilder sb = new StringBuilder();
116115
sb.append(channel);
117116

118117
if (message.isPing()) {
119118
sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B');
120119
} else {
121-
boolean success = false;
122120
Header header = message.getHeader();
123121
int networkMessageSize = header.getNetworkMessageSize();
124122
int messageLengthWithHeader = HEADER_SIZE + networkMessageSize;
125-
StreamInput streamInput = message.openOrGetStreamInput();
126-
try {
127-
final long requestId = header.getRequestId();
128-
final boolean isRequest = header.isRequest();
129-
final String type = isRequest ? "request" : "response";
130-
final String version = header.getVersion().toString();
131-
sb.append(" [length: ").append(messageLengthWithHeader);
132-
sb.append(", request id: ").append(requestId);
133-
sb.append(", type: ").append(type);
134-
sb.append(", version: ").append(version);
123+
final long requestId = header.getRequestId();
124+
final boolean isRequest = header.isRequest();
125+
final String type = isRequest ? "request" : "response";
126+
final String version = header.getVersion().toString();
127+
sb.append(" [length: ").append(messageLengthWithHeader);
128+
sb.append(", request id: ").append(requestId);
129+
sb.append(", type: ").append(type);
130+
sb.append(", version: ").append(version);
135131

136-
// TODO: Maybe Fix for BWC
137-
if (header.needsToReadVariableHeader() == false && isRequest) {
138-
sb.append(", action: ").append(header.getActionName());
139-
}
140-
sb.append(']');
141-
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
142-
success = true;
143-
} finally {
144-
if (success) {
145-
IOUtils.close(streamInput);
146-
} else {
147-
IOUtils.closeWhileHandlingException(streamInput);
148-
}
132+
// TODO: Maybe Fix for BWC
133+
if (header.needsToReadVariableHeader() == false && isRequest) {
134+
sb.append(", action: ").append(header.getActionName());
149135
}
136+
sb.append(']');
137+
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
150138
}
151139
return sb.toString();
152140
}
153-
154-
private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
155-
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
156-
try {
157-
return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
158-
} catch (IllegalArgumentException e) {
159-
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
160-
}
161-
} else {
162-
return streamInput;
163-
}
164-
}
165141
}

server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,28 @@ public void testLoggingHandler() throws IOException {
6666
}
6767
}
6868

69+
public void testLoggingHandlerWithExceptionMessage() {
70+
final String readPattern = ".*\\[length: \\d+" + ", request id: \\d+" + ", type: request" + ", version: .*" + " READ: \\d+B";
71+
72+
final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation(
73+
"spatial stats request",
74+
TransportLogger.class.getCanonicalName(),
75+
Level.TRACE,
76+
readPattern
77+
);
78+
79+
InboundMessage inboundMessage = new InboundMessage(
80+
new Header(0, 0, TransportStatus.setRequest((byte) 0), TransportVersion.current()),
81+
new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats")
82+
);
83+
84+
try (var mockLog = MockLog.capture(TransportLogger.class)) {
85+
mockLog.addExpectation(readExpectation);
86+
TransportLogger.logInboundMessage(mock(TcpChannel.class), inboundMessage);
87+
mockLog.assertAllExpectationsMatched();
88+
}
89+
}
90+
6991
private BytesReference buildRequest() throws IOException {
7092
BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
7193
Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null);

0 commit comments

Comments
 (0)