Skip to content

Commit 09bc343

Browse files
authored
Migrate to v9 transport handshake (#121646)
This commit moves to sending out a v9-format handshake (with apparent transport version `8_800_00_0`) and drops support for handshakes from v7 nodes.
1 parent e4fd6c0 commit 09bc343

File tree

5 files changed

+40
-297
lines changed

5 files changed

+40
-297
lines changed

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,49 +44,17 @@ final class TransportHandshaker {
4444
* ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later
4545
* messages.
4646
*
47-
* This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure
48-
* as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response
49-
* to any v6080099 or v8800000 requests that it receives.
47+
* This version supports two handshake protocols, v7170099 and v8800000, which respectively have the same message structure as the
48+
* transport protocols of v7.17.0, and v8.18.0. This node only sends v8800000 requests, but it can send a valid response to any v7170099
49+
* requests that it receives.
5050
*
5151
* Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be
52-
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did
53-
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
52+
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did rely
53+
* on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
5454
* true.
5555
*
5656
* Here are some example messages, broken down to show their structure. See TransportHandshakerRawMessageTests for supporting tests.
5757
*
58-
* ## v6080099 Request:
59-
*
60-
* 45 53 -- 'ES' marker
61-
* 00 00 00 34 -- total message length
62-
* 00 00 00 00 00 00 00 01 -- request ID
63-
* 08 -- status flags (0b1000 == handshake request)
64-
* 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099)
65-
* 00 -- no request headers [1]
66-
* 00 -- no response headers [1]
67-
* 01 -- one feature [2]
68-
* 06 -- feature name length
69-
* 78 2d 70 61 63 6b -- feature name 'x-pack'
70-
* 16 -- action string size
71-
* 69 6e 74 65 72 6e 61 6c }
72-
* 3a 74 63 70 2f 68 61 6e }- ASCII representation of HANDSHAKE_ACTION_NAME
73-
* 64 73 68 61 6b 65 }
74-
* 00 -- no parent task ID [3]
75-
* 04 -- payload length
76-
* 8b d5 b5 03 -- max acceptable protocol version (vInt: 00000011 10110101 11010101 10001011 == 7170699)
77-
*
78-
* ## v6080099 Response:
79-
*
80-
* 45 53 -- 'ES' marker
81-
* 00 00 00 13 -- total message length
82-
* 00 00 00 00 00 00 00 01 -- request ID (copied from request)
83-
* 09 -- status flags (0b1001 == handshake response)
84-
* 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099, copied from request)
85-
* 00 -- no request headers [1]
86-
* 00 -- no response headers [1]
87-
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
88-
*
89-
*
9058
* ## v7170099 Requests:
9159
*
9260
* 45 53 -- 'ES' marker
@@ -158,14 +126,9 @@ final class TransportHandshaker {
158126
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
159127
*/
160128

161-
static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99);
162129
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
163130
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
164-
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
165-
V7_HANDSHAKE_VERSION,
166-
V8_HANDSHAKE_VERSION,
167-
V9_HANDSHAKE_VERSION
168-
);
131+
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(V8_HANDSHAKE_VERSION, V9_HANDSHAKE_VERSION);
169132

170133
static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
171134
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
@@ -203,7 +166,7 @@ void sendHandshake(
203166
);
204167
boolean success = false;
205168
try {
206-
handshakeRequestSender.sendRequest(node, channel, requestId, V8_HANDSHAKE_VERSION);
169+
handshakeRequestSender.sendRequest(node, channel, requestId, V9_HANDSHAKE_VERSION);
207170

208171
threadPool.schedule(
209172
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.settings.Settings;
1919
import org.elasticsearch.common.util.MockPageCacheRecycler;
2020
import org.elasticsearch.common.util.concurrent.ThreadContext;
21-
import org.elasticsearch.core.UpdateForV9;
2221
import org.elasticsearch.test.ESTestCase;
2322
import org.elasticsearch.test.TransportVersionUtils;
2423
import org.elasticsearch.transport.InboundDecoder.ChannelType;
@@ -126,105 +125,6 @@ public void testDecode() throws IOException {
126125

127126
}
128127

129-
@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // can delete test in v9
130-
public void testDecodePreHeaderSizeVariableInt() throws IOException {
131-
Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null);
132-
String action = "test-request";
133-
long requestId = randomNonNegativeLong();
134-
final TransportVersion preHeaderVariableInt = TransportHandshaker.V7_HANDSHAKE_VERSION;
135-
final String contentValue = randomAlphaOfLength(100);
136-
// 8.0 is only compatible with handshakes on a pre-variable int version
137-
final OutboundMessage message = new OutboundMessage.Request(
138-
threadContext,
139-
new TestRequest(contentValue),
140-
preHeaderVariableInt,
141-
action,
142-
requestId,
143-
true,
144-
compressionScheme
145-
);
146-
147-
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
148-
final BytesReference totalBytes = message.serialize(os);
149-
int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt);
150-
151-
InboundDecoder decoder = new InboundDecoder(recycler);
152-
final ArrayList<Object> fragments = new ArrayList<>();
153-
final ReleasableBytesReference releasable1 = wrapAsReleasable(totalBytes);
154-
int bytesConsumed = decoder.decode(releasable1, fragments::add);
155-
assertEquals(partialHeaderSize, bytesConsumed);
156-
assertTrue(releasable1.hasReferences());
157-
158-
final Header header = (Header) fragments.get(0);
159-
assertEquals(requestId, header.getRequestId());
160-
assertEquals(preHeaderVariableInt, header.getVersion());
161-
if (compressionScheme == null) {
162-
assertFalse(header.isCompressed());
163-
} else {
164-
assertTrue(header.isCompressed());
165-
}
166-
assertTrue(header.isHandshake());
167-
assertTrue(header.isRequest());
168-
assertTrue(header.needsToReadVariableHeader());
169-
fragments.clear();
170-
171-
final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
172-
final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2);
173-
int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
174-
if (compressionScheme == null) {
175-
assertEquals(2, fragments.size());
176-
} else {
177-
assertEquals(3, fragments.size());
178-
final Object body = fragments.get(1);
179-
assertThat(body, instanceOf(ReleasableBytesReference.class));
180-
((ReleasableBytesReference) body).close();
181-
}
182-
assertEquals(InboundDecoder.END_CONTENT, fragments.get(fragments.size() - 1));
183-
assertEquals(totalBytes.length() - bytesConsumed, bytesConsumed2);
184-
}
185-
}
186-
187-
public void testDecodeHandshakeV7Compatibility() throws IOException {
188-
String action = "test-request";
189-
long requestId = randomNonNegativeLong();
190-
final String headerKey = randomAlphaOfLength(10);
191-
final String headerValue = randomAlphaOfLength(20);
192-
threadContext.putHeader(headerKey, headerValue);
193-
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
194-
OutboundMessage message = new OutboundMessage.Request(
195-
threadContext,
196-
new TestRequest(randomAlphaOfLength(100)),
197-
handshakeCompat,
198-
action,
199-
requestId,
200-
true,
201-
null
202-
);
203-
204-
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
205-
final BytesReference bytes = message.serialize(os);
206-
int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
207-
208-
InboundDecoder decoder = new InboundDecoder(recycler);
209-
final ArrayList<Object> fragments = new ArrayList<>();
210-
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
211-
int bytesConsumed = decoder.decode(releasable1, fragments::add);
212-
assertEquals(totalHeaderSize, bytesConsumed);
213-
assertTrue(releasable1.hasReferences());
214-
215-
final Header header = (Header) fragments.get(0);
216-
assertEquals(requestId, header.getRequestId());
217-
assertEquals(handshakeCompat, header.getVersion());
218-
assertFalse(header.isCompressed());
219-
assertTrue(header.isHandshake());
220-
assertTrue(header.isRequest());
221-
// TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
222-
assertTrue(header.needsToReadVariableHeader());
223-
fragments.clear();
224-
}
225-
226-
}
227-
228128
public void testDecodeHandshakeV8Compatibility() throws IOException {
229129
doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, null);
230130
doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
@@ -453,46 +353,6 @@ public void testCompressedDecode() throws IOException {
453353

454354
}
455355

456-
public void testCompressedDecodeHandshakeCompatibility() throws IOException {
457-
String action = "test-request";
458-
long requestId = randomNonNegativeLong();
459-
final String headerKey = randomAlphaOfLength(10);
460-
final String headerValue = randomAlphaOfLength(20);
461-
threadContext.putHeader(headerKey, headerValue);
462-
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
463-
OutboundMessage message = new OutboundMessage.Request(
464-
threadContext,
465-
new TestRequest(randomAlphaOfLength(100)),
466-
handshakeCompat,
467-
action,
468-
requestId,
469-
true,
470-
Compression.Scheme.DEFLATE
471-
);
472-
473-
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
474-
final BytesReference bytes = message.serialize(os);
475-
int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
476-
477-
InboundDecoder decoder = new InboundDecoder(recycler);
478-
final ArrayList<Object> fragments = new ArrayList<>();
479-
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
480-
int bytesConsumed = decoder.decode(releasable1, fragments::add);
481-
assertEquals(totalHeaderSize, bytesConsumed);
482-
assertTrue(releasable1.hasReferences());
483-
484-
final Header header = (Header) fragments.get(0);
485-
assertEquals(requestId, header.getRequestId());
486-
assertEquals(handshakeCompat, header.getVersion());
487-
assertTrue(header.isCompressed());
488-
assertTrue(header.isHandshake());
489-
assertTrue(header.isRequest());
490-
// TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
491-
assertTrue(header.needsToReadVariableHeader());
492-
fragments.clear();
493-
}
494-
}
495-
496356
public void testVersionIncompatibilityDecodeException() throws IOException {
497357
String action = "test-request";
498358
long requestId = randomNonNegativeLong();

server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java

Lines changed: 4 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
2121
import org.elasticsearch.common.transport.TransportAddress;
2222
import org.elasticsearch.core.UpdateForV10;
23-
import org.elasticsearch.core.UpdateForV9;
2423
import org.elasticsearch.test.ESSingleNodeTestCase;
2524
import org.elasticsearch.test.TransportVersionUtils;
2625

@@ -38,56 +37,6 @@
3837

3938
public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
4039

41-
@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // remove support for v7 handshakes in v9
42-
public void testV7Handshake() throws Exception {
43-
final BytesRef handshakeRequestBytes;
44-
final var requestId = randomNonNegativeLong();
45-
try (var outputStream = new BytesStreamOutput()) {
46-
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
47-
outputStream.writeLong(requestId);
48-
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
49-
outputStream.writeInt(TransportHandshaker.V7_HANDSHAKE_VERSION.id());
50-
outputStream.writeByte((byte) 0); // no request headers;
51-
outputStream.writeByte((byte) 0); // no response headers;
52-
outputStream.writeStringArray(new String[] { "x-pack" }); // one feature
53-
outputStream.writeString("internal:tcp/handshake");
54-
outputStream.writeByte((byte) 0); // no parent task ID;
55-
56-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
57-
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
58-
outputStream.writeByte((byte) 4); // payload length
59-
outputStream.writeVInt(requestNodeTransportVersionId);
60-
61-
handshakeRequestBytes = outputStream.bytes().toBytesRef();
62-
}
63-
64-
final BytesRef handshakeResponseBytes;
65-
try (var socket = openTransportConnection()) {
66-
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
67-
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
68-
streamOutput.writeInt(handshakeRequestBytes.length);
69-
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
70-
streamOutput.flush();
71-
72-
var streamInput = new InputStreamStreamInput(socket.getInputStream());
73-
assertEquals((byte) 'E', streamInput.readByte());
74-
assertEquals((byte) 'S', streamInput.readByte());
75-
var responseLength = streamInput.readInt();
76-
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
77-
}
78-
79-
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
80-
assertEquals(requestId, inputStream.readLong());
81-
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
82-
assertEquals(TransportHandshaker.V7_HANDSHAKE_VERSION.id(), inputStream.readInt());
83-
assertEquals((byte) 0, inputStream.readByte()); // no request headers
84-
assertEquals((byte) 0, inputStream.readByte()); // no response headers
85-
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
86-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
87-
assertEquals(-1, inputStream.read());
88-
}
89-
}
90-
9140
@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // remove support for v8 handshakes in v10
9241
public void testV8Handshake() throws Exception {
9342
final BytesRef handshakeRequestBytes;
@@ -223,11 +172,10 @@ public void testOutboundHandshake() throws Exception {
223172
try (var inputStream = new BytesArray(handshakeRequestBytes).streamInput()) {
224173
assertThat(inputStream.readLong(), greaterThan(0L));
225174
assertEquals(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
226-
assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt());
227-
assertEquals(0x1a, inputStream.readInt()); // length of variable-length header, always 0x1a
175+
assertEquals(TransportHandshaker.V9_HANDSHAKE_VERSION.id(), inputStream.readInt());
176+
assertEquals(0x19, inputStream.readInt()); // length of variable-length header, always 0x19
228177
assertEquals((byte) 0, inputStream.readByte()); // no request headers
229178
assertEquals((byte) 0, inputStream.readByte()); // no response headers
230-
assertEquals((byte) 0, inputStream.readByte()); // no features
231179
assertEquals("internal:tcp/handshake", inputStream.readString());
232180
assertEquals((byte) 0, inputStream.readByte()); // no parent task
233181
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
@@ -236,8 +184,9 @@ public void testOutboundHandshake() throws Exception {
236184
}
237185

238186
try (var inputStream = new BytesArray(payloadBytes).streamInput()) {
239-
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
187+
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
240188
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
189+
assertEquals(Build.current().version(), inputStream.readString());
241190
assertEquals(-1, inputStream.read());
242191
}
243192
}

0 commit comments

Comments
 (0)