Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.IOUtils;
Expand All @@ -30,7 +29,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
try {
String logMessage = format(channel, message, "READ");
logger.trace(logMessage);
} catch (IOException e) {
} catch (Exception e) {
logger.warn("an exception occurred formatting a READ trace message", e);
}
}
Expand All @@ -41,7 +40,7 @@ static void logInboundMessage(TcpChannel channel, InboundMessage message) {
try {
String logMessage = format(channel, message, "READ");
logger.trace(logMessage);
} catch (IOException e) {
} catch (Exception e) {
logger.warn("an exception occurred formatting a READ trace message", e);
}
}
Expand All @@ -57,7 +56,7 @@ static void logOutboundMessage(TcpChannel channel, BytesReference message) {
BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE);
String logMessage = format(channel, withoutHeader, "WRITE");
logger.trace(logMessage);
} catch (IOException e) {
} catch (Exception e) {
logger.warn("an exception occurred formatting a WRITE trace message", e);
}
}
Expand Down Expand Up @@ -111,55 +110,32 @@ private static String format(TcpChannel channel, BytesReference message, String
return sb.toString();
}

private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
private static String format(TcpChannel channel, InboundMessage message, String event) {
final StringBuilder sb = new StringBuilder();
sb.append(channel);

if (message.isPing()) {
sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B');
} else {
boolean success = false;
Header header = message.getHeader();
int networkMessageSize = header.getNetworkMessageSize();
int messageLengthWithHeader = HEADER_SIZE + networkMessageSize;
StreamInput streamInput = message.openOrGetStreamInput();
try {
final long requestId = header.getRequestId();
final boolean isRequest = header.isRequest();
final String type = isRequest ? "request" : "response";
final String version = header.getVersion().toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);
final long requestId = header.getRequestId();
final boolean isRequest = header.isRequest();
final String type = isRequest ? "request" : "response";
final String version = header.getVersion().toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);

// TODO: Maybe Fix for BWC
if (header.needsToReadVariableHeader() == false && isRequest) {
sb.append(", action: ").append(header.getActionName());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
success = true;
} finally {
if (success) {
IOUtils.close(streamInput);
} else {
IOUtils.closeWhileHandlingException(streamInput);
}
// TODO: Maybe Fix for BWC
if (header.needsToReadVariableHeader() == false && isRequest) {
sb.append(", action: ").append(header.getActionName());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
}
return sb.toString();
}

private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
try {
return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
} catch (IllegalArgumentException e) {
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
}
} else {
return streamInput;
}
}
}