diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 06e8d3177a4ca..764ca018490f5 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -57,7 +57,6 @@ static TransportVersion def(int id) { public static final TransportVersion V_7_3_0 = def(7_03_00_99); public static final TransportVersion V_7_3_2 = def(7_03_02_99); public static final TransportVersion V_7_4_0 = def(7_04_00_99); - public static final TransportVersion V_7_6_0 = def(7_06_00_99); public static final TransportVersion V_7_8_0 = def(7_08_00_99); public static final TransportVersion V_7_8_1 = def(7_08_01_99); public static final TransportVersion V_7_9_0 = def(7_09_00_99); diff --git a/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java b/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java index fdcbd6912bc4c..a22b0d0229ed0 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java @@ -167,23 +167,17 @@ private static int headerBytesToRead(BytesReference reference, ByteSizeValue max return 0; } - TransportVersion remoteVersion = TransportVersion.fromId(reference.getInt(TcpHeader.VERSION_POSITION)); - int fixedHeaderSize = TcpHeader.headerSize(remoteVersion); - if (fixedHeaderSize > reference.length()) { + if (reference.length() <= TcpHeader.HEADER_SIZE) { return 0; - } else if (remoteVersion.before(TcpHeader.VERSION_WITH_HEADER_SIZE)) { - return fixedHeaderSize; } else { int variableHeaderSize = reference.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); if (variableHeaderSize < 0) { throw new StreamCorruptedException("invalid negative variable header size: " + variableHeaderSize); } - if (variableHeaderSize > maxHeaderSize.getBytes() - fixedHeaderSize) { - throw new StreamCorruptedException( - "header size [" + (fixedHeaderSize + variableHeaderSize) + "] exceeds limit of [" + maxHeaderSize + "]" - ); + int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize; + if (totalHeaderSize > maxHeaderSize.getBytes()) { + throw new StreamCorruptedException("header size [" + totalHeaderSize + "] exceeds limit of [" + maxHeaderSize + "]"); } - int totalHeaderSize = fixedHeaderSize + variableHeaderSize; if (totalHeaderSize > reference.length()) { return 0; } else { @@ -211,11 +205,9 @@ private static Header readHeader(int networkMessageSize, BytesReference bytesRef checkVersionCompatibility(header.getVersion()); } - if (header.getVersion().onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) { - // Skip since we already have ensured enough data available - streamInput.readInt(); - header.finishParsingHeader(streamInput); - } + // Skip since we already have ensured enough data available + streamInput.readInt(); + header.finishParsingHeader(streamInput); return header; } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java index c3b405aebe2a2..798385edefd6f 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java @@ -43,26 +43,20 @@ abstract class OutboundMessage extends NetworkMessage { BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOException { bytesStream.setTransportVersion(version); - bytesStream.skip(TcpHeader.headerSize(version)); + bytesStream.skip(TcpHeader.HEADER_SIZE); // The compressible bytes stream will not close the underlying bytes stream BytesReference reference; - int variableHeaderLength = -1; final long preHeaderPosition = bytesStream.position(); - if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) { - writeVariableHeader(bytesStream); - variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition); - } + writeVariableHeader(bytesStream); + int variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition); final boolean compress = TransportStatus.isCompress(status); final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream; final ReleasableBytesReference zeroCopyBuffer; try { stream.setTransportVersion(version); - if (variableHeaderLength == -1) { - writeVariableHeader(stream); - } if (message instanceof BytesTransportRequest bRequest) { bRequest.writeThin(stream); zeroCopyBuffer = bRequest.bytes; @@ -89,7 +83,7 @@ BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOExcepti } bytesStream.seek(0); - final int contentSize = reference.length() - TcpHeader.headerSize(version); + final int contentSize = reference.length() - TcpHeader.HEADER_SIZE; TcpHeader.writeHeader(bytesStream, requestId, status, version, contentSize, variableHeaderLength); return reference; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpHeader.java b/server/src/main/java/org/elasticsearch/transport/TcpHeader.java index 22f5a69aa08f6..d6948b50790da 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpHeader.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpHeader.java @@ -10,15 +10,12 @@ package org.elasticsearch.transport; import org.elasticsearch.TransportVersion; -import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; public class TcpHeader { - public static final TransportVersion VERSION_WITH_HEADER_SIZE = TransportVersions.V_7_6_0; - public static final int MARKER_BYTES_SIZE = 2; public static final int MESSAGE_LENGTH_SIZE = 4; @@ -37,19 +34,9 @@ public class TcpHeader { public static final int VARIABLE_HEADER_SIZE_POSITION = VERSION_POSITION + VERSION_ID_SIZE; - private static final int PRE_76_HEADER_SIZE = VERSION_POSITION + VERSION_ID_SIZE; - - public static final int BYTES_REQUIRED_FOR_VERSION = PRE_76_HEADER_SIZE; + public static final int BYTES_REQUIRED_FOR_VERSION = VERSION_POSITION + VERSION_ID_SIZE; - private static final int HEADER_SIZE = PRE_76_HEADER_SIZE + VARIABLE_HEADER_SIZE; - - public static int headerSize(TransportVersion version) { - if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) { - return HEADER_SIZE; - } else { - return PRE_76_HEADER_SIZE; - } - } + public static final int HEADER_SIZE = BYTES_REQUIRED_FOR_VERSION + VARIABLE_HEADER_SIZE; private static final byte[] PREFIX = { (byte) 'E', (byte) 'S' }; @@ -63,17 +50,10 @@ public static void writeHeader( ) throws IOException { output.writeBytes(PREFIX); // write the size, the size indicates the remaining message size, not including the size int - if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) { - output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE); - } else { - output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE); - } + output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE); output.writeLong(requestId); output.writeByte(status); output.writeInt(version.id()); - if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) { - assert variableHeaderSize != -1 : "Variable header size not set"; - output.writeInt(variableHeaderSize); - } + output.writeInt(variableHeaderSize); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java index 00cfd1f7f63d4..6cad97acab7ba 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -85,12 +85,7 @@ private static String format(TcpChannel channel, BytesReference message, String sb.append(", type: ").append(type); sb.append(", version: ").append(version); - if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) { - sb.append(", header size: ").append(streamInput.readInt()).append('B'); - } else { - streamInput = decompressingStream(status, streamInput); - assert InboundHandler.assertRemoteVersion(streamInput, version); - } + sb.append(", header size: ").append(streamInput.readInt()).append('B'); // read and discard headers ThreadContext.readHeadersFromStream(streamInput); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java index 97ca7d2ecd98b..be51cecc2cf9a 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java @@ -78,9 +78,7 @@ public void testDecode() throws IOException { try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { final BytesReference totalBytes = message.serialize(os); - int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt( - TcpHeader.VARIABLE_HEADER_SIZE_POSITION - ); + int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize); InboundDecoder decoder = new InboundDecoder(recycler); @@ -151,13 +149,12 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { final BytesReference bytes = message.serialize(os); - int totalHeaderSize = TcpHeader.headerSize(transportVersion); InboundDecoder decoder = new InboundDecoder(recycler); final ArrayList fragments = new ArrayList<>(); final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes); int bytesConsumed = decoder.decode(releasable1, fragments::add); - assertThat(bytesConsumed, greaterThan(totalHeaderSize)); + assertThat(bytesConsumed, greaterThan(TcpHeader.HEADER_SIZE)); assertTrue(releasable1.hasReferences()); final Header header = (Header) fragments.get(0); @@ -213,9 +210,7 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception { try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.SERVER, ChannelType.MIX))) { final ArrayList fragments = new ArrayList<>(); int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add); - int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0) - ? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION) - : 0); + int totalHeaderSize = TcpHeader.HEADER_SIZE + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); assertEquals(totalHeaderSize, bytesConsumed); final Header header = (Header) fragments.get(0); assertEquals(requestId, header.getRequestId()); @@ -259,9 +254,7 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception { try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.CLIENT, ChannelType.MIX))) { final ArrayList fragments = new ArrayList<>(); int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add); - int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0) - ? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION) - : 0); + int totalHeaderSize = TcpHeader.HEADER_SIZE + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); assertEquals(totalHeaderSize, bytesConsumed); final Header header = (Header) fragments.get(0); assertEquals(requestId, header.getRequestId()); @@ -304,9 +297,7 @@ public void testCompressedDecode() throws IOException { final BytesStreamOutput out = new BytesStreamOutput(); transportMessage.writeTo(out); final BytesReference uncompressedBytes = out.bytes(); - int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt( - TcpHeader.VARIABLE_HEADER_SIZE_POSITION - ); + int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); InboundDecoder decoder = new InboundDecoder(recycler); final ArrayList fragments = new ArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 3ec248e0d8d9a..0dc72cb9ce252 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -128,7 +128,6 @@ public void testPing() throws Exception { public void testRequestAndResponse() throws Exception { String action = "test-request"; - int headerSize = TcpHeader.headerSize(TransportVersion.current()); boolean isError = randomBoolean(); AtomicReference requestCaptor = new AtomicReference<>(); AtomicReference responseCaptor = new AtomicReference<>(); @@ -183,7 +182,7 @@ public TestResponse read(StreamInput in) throws IOException { BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); BytesReference fullRequestBytes = request.serialize(new RecyclerBytesStreamOutput(recycler)); - BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); + BytesReference requestContent = fullRequestBytes.slice(TcpHeader.HEADER_SIZE, fullRequestBytes.length() - TcpHeader.HEADER_SIZE); Header requestHeader = new Header( fullRequestBytes.length() - 6, requestId, @@ -208,7 +207,7 @@ public TestResponse read(StreamInput in) throws IOException { } BytesReference fullResponseBytes = channel.getMessageCaptor().get(); - BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize); + BytesReference responseContent = fullResponseBytes.slice(TcpHeader.HEADER_SIZE, fullResponseBytes.length() - TcpHeader.HEADER_SIZE); Header responseHeader = new Header(fullRequestBytes.length() - 6, requestId, responseStatus, TransportVersion.current()); InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {}); responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput()); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java index d0c6cd8b00ff5..b21299219f2bd 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java @@ -274,9 +274,8 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { } final BytesReference reference = message.serialize(streamOutput); - final int fixedHeaderSize = TcpHeader.headerSize(TransportVersion.current()); - final int variableHeaderSize = reference.getInt(fixedHeaderSize - 4); - final int totalHeaderSize = fixedHeaderSize + variableHeaderSize; + final int variableHeaderSize = reference.getInt(TcpHeader.HEADER_SIZE - 4); + final int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize; final AtomicBoolean bodyReleased = new AtomicBoolean(false); for (int i = 0; i < totalHeaderSize - 1; ++i) { try (ReleasableBytesReference slice = ReleasableBytesReference.wrap(reference.slice(i, 1))) { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HeaderSizeLimitTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HeaderSizeLimitTests.java index ba7c2e3844521..3cd6fbfe3c413 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HeaderSizeLimitTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HeaderSizeLimitTests.java @@ -138,7 +138,7 @@ public void terminateThreadPool() { public void testThatAcceptableHeaderSizeGoesThroughTheRemoteClusterPort() throws Exception { int messageLength = randomIntBetween(128, 256); long requestId = randomLongBetween(1L, 1000L); - int acceptableHeaderSize = randomIntBetween(0, maxHeaderSize - TcpHeader.headerSize(TransportVersion.current())); + int acceptableHeaderSize = randomIntBetween(0, maxHeaderSize - TcpHeader.HEADER_SIZE); try ( ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput( messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE, @@ -163,8 +163,8 @@ public void testThatLargerHeaderSizeClosesTheRemoteClusterPort() throws Exceptio int messageLength = randomIntBetween(128, 256); long requestId = randomLongBetween(1L, 1000L); int largeHeaderSize = randomIntBetween( - maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()) + 1, - messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.headerSize(TransportVersion.current()) + maxHeaderSize - TcpHeader.HEADER_SIZE + 1, + messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.HEADER_SIZE ); try ( ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput( @@ -190,8 +190,8 @@ public void testThatLargerHeaderSizeIsAcceptableForDefaultTransportPort() throws int messageLength = randomIntBetween(128, 256); long requestId = randomLongBetween(1L, 1000L); int largeHeaderSize = randomIntBetween( - maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()) + 1, - messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.headerSize(TransportVersion.current()) + maxHeaderSize - TcpHeader.HEADER_SIZE + 1, + messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.HEADER_SIZE ); try ( ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(