Skip to content

Commit 825b5de

Browse files
authored
Introduce new handshake version for v9 (#120109)
This commit introduces `V9_HANDSHAKE_VERSION` which is the transport version that will eventually be for handshakes sent by v9 nodes. It does not adjust the handshake version yet, because we must first backport this to the v8.18 branch so that v8.18 nodes can understand the new v9 handshakes.
1 parent 30948ac commit 825b5de

File tree

2 files changed

+71
-18
lines changed

2 files changed

+71
-18
lines changed

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,14 @@ final class TransportHandshaker {
4141
* ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later
4242
* messages.
4343
*
44-
* This version supports two handshake protocols, v6080099 and v7170099, which respectively have the same message structure as the
45-
* transport protocols of v6.8.0 and v7.17.0. This node only sends v7170099 requests, but it can send a valid response to any v6080099
46-
* requests that it receives.
44+
* This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure
45+
* as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response
46+
* to any v6080099 or v8800000 requests that it receives.
47+
*
48+
* Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be
49+
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did
50+
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
51+
* true.
4752
*
4853
* Here are some example messages, broken down to show their structure:
4954
*
@@ -79,7 +84,7 @@ final class TransportHandshaker {
7984
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
8085
*
8186
*
82-
* ## v7170099 Request:
87+
* ## v7170099 and v8800000 Requests:
8388
*
8489
* 45 53 -- 'ES' marker
8590
* 00 00 00 31 -- total message length
@@ -98,7 +103,7 @@ final class TransportHandshaker {
98103
* 04 -- payload length
99104
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
100105
*
101-
* ## v7170099 Response:
106+
* ## v7170099 and v8800000 Responses:
102107
*
103108
* 45 53 -- 'ES' marker
104109
* 00 00 00 17 -- total message length
@@ -118,7 +123,12 @@ final class TransportHandshaker {
118123

119124
static final TransportVersion EARLIEST_HANDSHAKE_VERSION = TransportVersion.fromId(6080099);
120125
static final TransportVersion REQUEST_HANDSHAKE_VERSION = TransportVersions.MINIMUM_COMPATIBLE;
121-
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(EARLIEST_HANDSHAKE_VERSION, REQUEST_HANDSHAKE_VERSION);
126+
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
127+
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
128+
EARLIEST_HANDSHAKE_VERSION,
129+
REQUEST_HANDSHAKE_VERSION,
130+
V9_HANDSHAKE_VERSION
131+
);
122132

123133
static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
124134
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import static org.elasticsearch.common.bytes.ReleasableBytesReferenceStreamInputTests.wrapAsReleasable;
2929
import static org.hamcrest.Matchers.containsString;
30+
import static org.hamcrest.Matchers.greaterThan;
3031
import static org.hamcrest.Matchers.hasItems;
3132
import static org.hamcrest.Matchers.instanceOf;
3233

@@ -182,7 +183,7 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException {
182183
}
183184
}
184185

185-
public void testDecodeHandshakeCompatibility() throws IOException {
186+
public void testDecodeHandshakeV7Compatibility() throws IOException {
186187
String action = "test-request";
187188
long requestId = randomNonNegativeLong();
188189
final String headerKey = randomAlphaOfLength(10);
@@ -223,6 +224,55 @@ public void testDecodeHandshakeCompatibility() throws IOException {
223224

224225
}
225226

227+
public void testDecodeHandshakeV8Compatibility() throws IOException {
228+
doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, null);
229+
doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
230+
}
231+
232+
public void testDecodeHandshakeV9Compatibility() throws IOException {
233+
doHandshakeCompatibilityTest(TransportHandshaker.V9_HANDSHAKE_VERSION, null);
234+
doHandshakeCompatibilityTest(TransportHandshaker.V9_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
235+
}
236+
237+
private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Compression.Scheme compressionScheme) throws IOException {
238+
String action = "test-request";
239+
long requestId = randomNonNegativeLong();
240+
final String headerKey = randomAlphaOfLength(10);
241+
final String headerValue = randomAlphaOfLength(20);
242+
threadContext.putHeader(headerKey, headerValue);
243+
OutboundMessage message = new OutboundMessage.Request(
244+
threadContext,
245+
new TestRequest(randomAlphaOfLength(100)),
246+
transportVersion,
247+
action,
248+
requestId,
249+
true,
250+
compressionScheme
251+
);
252+
253+
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
254+
final BytesReference bytes = message.serialize(os);
255+
int totalHeaderSize = TcpHeader.headerSize(transportVersion);
256+
257+
InboundDecoder decoder = new InboundDecoder(recycler);
258+
final ArrayList<Object> fragments = new ArrayList<>();
259+
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
260+
int bytesConsumed = decoder.decode(releasable1, fragments::add);
261+
assertThat(bytesConsumed, greaterThan(totalHeaderSize));
262+
assertTrue(releasable1.hasReferences());
263+
264+
final Header header = (Header) fragments.get(0);
265+
assertEquals(requestId, header.getRequestId());
266+
assertEquals(transportVersion, header.getVersion());
267+
assertEquals(compressionScheme == Compression.Scheme.DEFLATE, header.isCompressed());
268+
assertTrue(header.isHandshake());
269+
assertTrue(header.isRequest());
270+
assertFalse(header.needsToReadVariableHeader());
271+
assertEquals(headerValue, header.getRequestHeaders().get(headerKey));
272+
fragments.clear();
273+
}
274+
}
275+
226276
public void testClientChannelTypeFailsDecodingRequests() throws Exception {
227277
String action = "test-request";
228278
long requestId = randomNonNegativeLong();
@@ -488,23 +538,16 @@ public void testCheckVersionCompatibility() {
488538
}
489539

490540
public void testCheckHandshakeCompatibility() {
491-
try {
492-
InboundDecoder.checkHandshakeVersionCompatibility(randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS));
493-
} catch (IllegalStateException e) {
494-
throw new AssertionError(e);
495-
}
541+
for (final var allowedHandshakeVersion : TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS) {
542+
InboundDecoder.checkHandshakeVersionCompatibility(allowedHandshakeVersion); // should not throw
496543

497-
var invalid = TransportVersion.fromId(TransportHandshaker.EARLIEST_HANDSHAKE_VERSION.id() - 1);
498-
try {
499-
InboundDecoder.checkHandshakeVersionCompatibility(invalid);
500-
fail();
501-
} catch (IllegalStateException expected) {
544+
var invalid = TransportVersion.fromId(allowedHandshakeVersion.id() + randomFrom(-1, +1));
502545
assertEquals(
503546
"Received message from unsupported version: ["
504547
+ invalid
505548
+ "] allowed versions are: "
506549
+ TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS,
507-
expected.getMessage()
550+
expectThrows(IllegalStateException.class, () -> InboundDecoder.checkHandshakeVersionCompatibility(invalid)).getMessage()
508551
);
509552
}
510553
}

0 commit comments

Comments
 (0)