Skip to content

Commit 6925bc9

Browse files
authored
Remove code to handle pre-7.6 TCP headers (#123899)
And 7.6 transport version
1 parent 792ab39 commit 6925bc9

File tree

9 files changed

+30
-81
lines changed

9 files changed

+30
-81
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ static TransportVersion def(int id) {
5757
public static final TransportVersion V_7_3_0 = def(7_03_00_99);
5858
public static final TransportVersion V_7_3_2 = def(7_03_02_99);
5959
public static final TransportVersion V_7_4_0 = def(7_04_00_99);
60-
public static final TransportVersion V_7_6_0 = def(7_06_00_99);
6160
public static final TransportVersion V_7_8_0 = def(7_08_00_99);
6261
public static final TransportVersion V_7_8_1 = def(7_08_01_99);
6362
public static final TransportVersion V_7_9_0 = def(7_09_00_99);

server/src/main/java/org/elasticsearch/transport/InboundDecoder.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,23 +167,17 @@ private static int headerBytesToRead(BytesReference reference, ByteSizeValue max
167167
return 0;
168168
}
169169

170-
TransportVersion remoteVersion = TransportVersion.fromId(reference.getInt(TcpHeader.VERSION_POSITION));
171-
int fixedHeaderSize = TcpHeader.headerSize(remoteVersion);
172-
if (fixedHeaderSize > reference.length()) {
170+
if (reference.length() <= TcpHeader.HEADER_SIZE) {
173171
return 0;
174-
} else if (remoteVersion.before(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
175-
return fixedHeaderSize;
176172
} else {
177173
int variableHeaderSize = reference.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
178174
if (variableHeaderSize < 0) {
179175
throw new StreamCorruptedException("invalid negative variable header size: " + variableHeaderSize);
180176
}
181-
if (variableHeaderSize > maxHeaderSize.getBytes() - fixedHeaderSize) {
182-
throw new StreamCorruptedException(
183-
"header size [" + (fixedHeaderSize + variableHeaderSize) + "] exceeds limit of [" + maxHeaderSize + "]"
184-
);
177+
int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize;
178+
if (totalHeaderSize > maxHeaderSize.getBytes()) {
179+
throw new StreamCorruptedException("header size [" + totalHeaderSize + "] exceeds limit of [" + maxHeaderSize + "]");
185180
}
186-
int totalHeaderSize = fixedHeaderSize + variableHeaderSize;
187181
if (totalHeaderSize > reference.length()) {
188182
return 0;
189183
} else {
@@ -211,11 +205,9 @@ private static Header readHeader(int networkMessageSize, BytesReference bytesRef
211205
checkVersionCompatibility(header.getVersion());
212206
}
213207

214-
if (header.getVersion().onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
215-
// Skip since we already have ensured enough data available
216-
streamInput.readInt();
217-
header.finishParsingHeader(streamInput);
218-
}
208+
// Skip since we already have ensured enough data available
209+
streamInput.readInt();
210+
header.finishParsingHeader(streamInput);
219211
return header;
220212
}
221213
}

server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,20 @@ abstract class OutboundMessage extends NetworkMessage {
4343

4444
BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOException {
4545
bytesStream.setTransportVersion(version);
46-
bytesStream.skip(TcpHeader.headerSize(version));
46+
bytesStream.skip(TcpHeader.HEADER_SIZE);
4747

4848
// The compressible bytes stream will not close the underlying bytes stream
4949
BytesReference reference;
50-
int variableHeaderLength = -1;
5150
final long preHeaderPosition = bytesStream.position();
5251

53-
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
54-
writeVariableHeader(bytesStream);
55-
variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
56-
}
52+
writeVariableHeader(bytesStream);
53+
int variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
5754

5855
final boolean compress = TransportStatus.isCompress(status);
5956
final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream;
6057
final ReleasableBytesReference zeroCopyBuffer;
6158
try {
6259
stream.setTransportVersion(version);
63-
if (variableHeaderLength == -1) {
64-
writeVariableHeader(stream);
65-
}
6660
if (message instanceof BytesTransportRequest bRequest) {
6761
bRequest.writeThin(stream);
6862
zeroCopyBuffer = bRequest.bytes;
@@ -89,7 +83,7 @@ BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOExcepti
8983
}
9084

9185
bytesStream.seek(0);
92-
final int contentSize = reference.length() - TcpHeader.headerSize(version);
86+
final int contentSize = reference.length() - TcpHeader.HEADER_SIZE;
9387
TcpHeader.writeHeader(bytesStream, requestId, status, version, contentSize, variableHeaderLength);
9488
return reference;
9589
}

server/src/main/java/org/elasticsearch/transport/TcpHeader.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.TransportVersion;
13-
import org.elasticsearch.TransportVersions;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514

1615
import java.io.IOException;
1716

1817
public class TcpHeader {
1918

20-
public static final TransportVersion VERSION_WITH_HEADER_SIZE = TransportVersions.V_7_6_0;
21-
2219
public static final int MARKER_BYTES_SIZE = 2;
2320

2421
public static final int MESSAGE_LENGTH_SIZE = 4;
@@ -37,19 +34,9 @@ public class TcpHeader {
3734

3835
public static final int VARIABLE_HEADER_SIZE_POSITION = VERSION_POSITION + VERSION_ID_SIZE;
3936

40-
private static final int PRE_76_HEADER_SIZE = VERSION_POSITION + VERSION_ID_SIZE;
41-
42-
public static final int BYTES_REQUIRED_FOR_VERSION = PRE_76_HEADER_SIZE;
37+
public static final int BYTES_REQUIRED_FOR_VERSION = VERSION_POSITION + VERSION_ID_SIZE;
4338

44-
private static final int HEADER_SIZE = PRE_76_HEADER_SIZE + VARIABLE_HEADER_SIZE;
45-
46-
public static int headerSize(TransportVersion version) {
47-
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
48-
return HEADER_SIZE;
49-
} else {
50-
return PRE_76_HEADER_SIZE;
51-
}
52-
}
39+
public static final int HEADER_SIZE = BYTES_REQUIRED_FOR_VERSION + VARIABLE_HEADER_SIZE;
5340

5441
private static final byte[] PREFIX = { (byte) 'E', (byte) 'S' };
5542

@@ -63,17 +50,10 @@ public static void writeHeader(
6350
) throws IOException {
6451
output.writeBytes(PREFIX);
6552
// write the size, the size indicates the remaining message size, not including the size int
66-
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
67-
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);
68-
} else {
69-
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE);
70-
}
53+
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);
7154
output.writeLong(requestId);
7255
output.writeByte(status);
7356
output.writeInt(version.id());
74-
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
75-
assert variableHeaderSize != -1 : "Variable header size not set";
76-
output.writeInt(variableHeaderSize);
77-
}
57+
output.writeInt(variableHeaderSize);
7858
}
7959
}

server/src/main/java/org/elasticsearch/transport/TransportLogger.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,7 @@ private static String format(TcpChannel channel, BytesReference message, String
8585
sb.append(", type: ").append(type);
8686
sb.append(", version: ").append(version);
8787

88-
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
89-
sb.append(", header size: ").append(streamInput.readInt()).append('B');
90-
} else {
91-
streamInput = decompressingStream(status, streamInput);
92-
assert InboundHandler.assertRemoteVersion(streamInput, version);
93-
}
88+
sb.append(", header size: ").append(streamInput.readInt()).append('B');
9489

9590
// read and discard headers
9691
ThreadContext.readHeadersFromStream(streamInput);

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,7 @@ public void testDecode() throws IOException {
7878

7979
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
8080
final BytesReference totalBytes = message.serialize(os);
81-
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt(
82-
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
83-
);
81+
int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
8482
final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize);
8583

8684
InboundDecoder decoder = new InboundDecoder(recycler);
@@ -151,13 +149,12 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com
151149

152150
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
153151
final BytesReference bytes = message.serialize(os);
154-
int totalHeaderSize = TcpHeader.headerSize(transportVersion);
155152

156153
InboundDecoder decoder = new InboundDecoder(recycler);
157154
final ArrayList<Object> fragments = new ArrayList<>();
158155
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
159156
int bytesConsumed = decoder.decode(releasable1, fragments::add);
160-
assertThat(bytesConsumed, greaterThan(totalHeaderSize));
157+
assertThat(bytesConsumed, greaterThan(TcpHeader.HEADER_SIZE));
161158
assertTrue(releasable1.hasReferences());
162159

163160
final Header header = (Header) fragments.get(0);
@@ -213,9 +210,7 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
213210
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.SERVER, ChannelType.MIX))) {
214211
final ArrayList<Object> fragments = new ArrayList<>();
215212
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
216-
int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0)
217-
? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION)
218-
: 0);
213+
int totalHeaderSize = TcpHeader.HEADER_SIZE + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
219214
assertEquals(totalHeaderSize, bytesConsumed);
220215
final Header header = (Header) fragments.get(0);
221216
assertEquals(requestId, header.getRequestId());
@@ -259,9 +254,7 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
259254
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.CLIENT, ChannelType.MIX))) {
260255
final ArrayList<Object> fragments = new ArrayList<>();
261256
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
262-
int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0)
263-
? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION)
264-
: 0);
257+
int totalHeaderSize = TcpHeader.HEADER_SIZE + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
265258
assertEquals(totalHeaderSize, bytesConsumed);
266259
final Header header = (Header) fragments.get(0);
267260
assertEquals(requestId, header.getRequestId());
@@ -304,9 +297,7 @@ public void testCompressedDecode() throws IOException {
304297
final BytesStreamOutput out = new BytesStreamOutput();
305298
transportMessage.writeTo(out);
306299
final BytesReference uncompressedBytes = out.bytes();
307-
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt(
308-
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
309-
);
300+
int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
310301

311302
InboundDecoder decoder = new InboundDecoder(recycler);
312303
final ArrayList<Object> fragments = new ArrayList<>();

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ public void testPing() throws Exception {
128128

129129
public void testRequestAndResponse() throws Exception {
130130
String action = "test-request";
131-
int headerSize = TcpHeader.headerSize(TransportVersion.current());
132131
boolean isError = randomBoolean();
133132
AtomicReference<TestRequest> requestCaptor = new AtomicReference<>();
134133
AtomicReference<TestResponse> responseCaptor = new AtomicReference<>();
@@ -183,7 +182,7 @@ public TestResponse read(StreamInput in) throws IOException {
183182

184183
BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
185184
BytesReference fullRequestBytes = request.serialize(new RecyclerBytesStreamOutput(recycler));
186-
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);
185+
BytesReference requestContent = fullRequestBytes.slice(TcpHeader.HEADER_SIZE, fullRequestBytes.length() - TcpHeader.HEADER_SIZE);
187186
Header requestHeader = new Header(
188187
fullRequestBytes.length() - 6,
189188
requestId,
@@ -208,7 +207,7 @@ public TestResponse read(StreamInput in) throws IOException {
208207
}
209208

210209
BytesReference fullResponseBytes = channel.getMessageCaptor().get();
211-
BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize);
210+
BytesReference responseContent = fullResponseBytes.slice(TcpHeader.HEADER_SIZE, fullResponseBytes.length() - TcpHeader.HEADER_SIZE);
212211
Header responseHeader = new Header(fullRequestBytes.length() - 6, requestId, responseStatus, TransportVersion.current());
213212
InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {});
214213
responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput());

server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,8 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException {
274274
}
275275

276276
final BytesReference reference = message.serialize(streamOutput);
277-
final int fixedHeaderSize = TcpHeader.headerSize(TransportVersion.current());
278-
final int variableHeaderSize = reference.getInt(fixedHeaderSize - 4);
279-
final int totalHeaderSize = fixedHeaderSize + variableHeaderSize;
277+
final int variableHeaderSize = reference.getInt(TcpHeader.HEADER_SIZE - 4);
278+
final int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize;
280279
final AtomicBoolean bodyReleased = new AtomicBoolean(false);
281280
for (int i = 0; i < totalHeaderSize - 1; ++i) {
282281
try (ReleasableBytesReference slice = ReleasableBytesReference.wrap(reference.slice(i, 1))) {

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HeaderSizeLimitTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void terminateThreadPool() {
138138
public void testThatAcceptableHeaderSizeGoesThroughTheRemoteClusterPort() throws Exception {
139139
int messageLength = randomIntBetween(128, 256);
140140
long requestId = randomLongBetween(1L, 1000L);
141-
int acceptableHeaderSize = randomIntBetween(0, maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()));
141+
int acceptableHeaderSize = randomIntBetween(0, maxHeaderSize - TcpHeader.HEADER_SIZE);
142142
try (
143143
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(
144144
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE,
@@ -163,8 +163,8 @@ public void testThatLargerHeaderSizeClosesTheRemoteClusterPort() throws Exceptio
163163
int messageLength = randomIntBetween(128, 256);
164164
long requestId = randomLongBetween(1L, 1000L);
165165
int largeHeaderSize = randomIntBetween(
166-
maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()) + 1,
167-
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.headerSize(TransportVersion.current())
166+
maxHeaderSize - TcpHeader.HEADER_SIZE + 1,
167+
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.HEADER_SIZE
168168
);
169169
try (
170170
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(
@@ -190,8 +190,8 @@ public void testThatLargerHeaderSizeIsAcceptableForDefaultTransportPort() throws
190190
int messageLength = randomIntBetween(128, 256);
191191
long requestId = randomLongBetween(1L, 1000L);
192192
int largeHeaderSize = randomIntBetween(
193-
maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()) + 1,
194-
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.headerSize(TransportVersion.current())
193+
maxHeaderSize - TcpHeader.HEADER_SIZE + 1,
194+
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.HEADER_SIZE
195195
);
196196
try (
197197
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(

0 commit comments

Comments
 (0)