Skip to content

Commit 0ab3585

Browse files
authored
[8.x] Test & document raw transport handshakes (elastic#120785) (elastic#120812)
* Test & document raw transport handshakes (elastic#120785) Updates the `TransportHandshaker` code comments to reflect the new handshake format introduced in elastic#120744, and adds some low-level tests to verify the bytes on the wire are as described in those docs. * Remove UpdateForVx annotations
1 parent f590711 commit 0ab3585

File tree

3 files changed

+284
-3
lines changed

3 files changed

+284
-3
lines changed

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ final class TransportHandshaker {
5252
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
5353
* true.
5454
*
55-
* Here are some example messages, broken down to show their structure:
55+
* Here are some example messages, broken down to show their structure. See TransportHandshakerRawMessageTests for supporting tests.
5656
*
5757
* ## v6080099 Request:
5858
*
@@ -86,7 +86,7 @@ final class TransportHandshaker {
8686
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
8787
*
8888
*
89-
* ## v7170099 and v8800000 Requests:
89+
* ## v7170099 Requests:
9090
*
9191
* 45 53 -- 'ES' marker
9292
* 00 00 00 31 -- total message length
@@ -105,7 +105,7 @@ final class TransportHandshaker {
105105
* 04 -- payload length
106106
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
107107
*
108-
* ## v7170099 and v8800000 Responses:
108+
* ## v7170099 Responses:
109109
*
110110
* 45 53 -- 'ES' marker
111111
* 00 00 00 17 -- total message length
@@ -117,6 +117,40 @@ final class TransportHandshaker {
117117
* 00 -- no response headers [1]
118118
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
119119
*
120+
* ## v8800000 Requests:
121+
*
122+
* 45 53 -- 'ES' marker
123+
* 00 00 00 36 -- total message length
124+
* 00 00 00 00 00 00 00 01 -- request ID
125+
* 08 -- status flags (0b1000 == handshake request)
126+
* 00 86 47 00 -- handshake protocol version (0x6d6833 == 7170099)
127+
* 00 00 00 19 -- length of variable portion of header
128+
* 00 -- no request headers [1]
129+
* 00 -- no response headers [1]
130+
* 16 -- action string size
131+
* 69 6e 74 65 72 6e 61 6c }
132+
* 3a 74 63 70 2f 68 61 6e }- ASCII representation of HANDSHAKE_ACTION_NAME
133+
* 64 73 68 61 6b 65 }
134+
* 00 -- no parent task ID [3]
135+
* 0a -- payload length
136+
* e8 8f 9b 04 -- requesting node transport version (vInt: 00000100 10011011 10001111 11101000 == 8833000)
137+
* 05 -- requesting node release version string length
138+
* 39 2e 30 2e 30 -- requesting node release version string "9.0.0"
139+
*
140+
* ## v8800000 Responses:
141+
*
142+
* 45 53 -- 'ES' marker
143+
* 00 00 00 1d -- total message length
144+
* 00 00 00 00 00 00 00 01 -- request ID (copied from request)
145+
* 09 -- status flags (0b1001 == handshake response)
146+
* 00 86 47 00 -- handshake protocol version (0x864700 == 8800000, copied from request)
147+
* 00 00 00 02 -- length of following variable portion of header
148+
* 00 -- no request headers [1]
149+
* 00 -- no response headers [1]
150+
* e8 8f 9b 04 -- responding node transport version (vInt: 00000100 10011011 10001111 11101000 == 8833000)
151+
* 05 -- responding node release version string length
152+
* 39 2e 30 2e 30 -- responding node release version string "9.0.0"
153+
*
120154
* [1] Thread context headers should be empty; see org.elasticsearch.common.util.concurrent.ThreadContext.ThreadContextStruct.writeTo
121155
* for their structure.
122156
* [2] A list of strings, which can safely be ignored

server/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ grant codeBase "${codebase.netty-transport}" {
127127

128128
grant {
129129
permission java.net.SocketPermission "127.0.0.1", "accept, connect, resolve";
130+
permission java.net.SocketPermission "[0:0:0:0:0:0:0:1]", "accept, connect, resolve";
130131
permission java.nio.file.LinkPermission "symbolic";
131132
// needed for keystore tests
132133
permission java.lang.RuntimePermission "accessUserInformation";
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.Build;
14+
import org.elasticsearch.TransportVersion;
15+
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
17+
import org.elasticsearch.common.bytes.BytesArray;
18+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
19+
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
20+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
21+
import org.elasticsearch.common.transport.TransportAddress;
22+
import org.elasticsearch.test.ESSingleNodeTestCase;
23+
import org.elasticsearch.test.TransportVersionUtils;
24+
25+
import java.net.InetAddress;
26+
import java.net.ServerSocket;
27+
import java.net.Socket;
28+
import java.nio.charset.StandardCharsets;
29+
import java.security.AccessController;
30+
import java.security.PrivilegedExceptionAction;
31+
32+
import static org.hamcrest.Matchers.allOf;
33+
import static org.hamcrest.Matchers.greaterThan;
34+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
35+
import static org.hamcrest.Matchers.lessThan;
36+
37+
public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
38+
39+
public void testV7Handshake() throws Exception {
40+
final BytesRef handshakeRequestBytes;
41+
final var requestId = randomNonNegativeLong();
42+
try (var outputStream = new BytesStreamOutput()) {
43+
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
44+
outputStream.writeLong(requestId);
45+
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
46+
outputStream.writeInt(TransportHandshaker.V7_HANDSHAKE_VERSION.id());
47+
outputStream.writeByte((byte) 0); // no request headers;
48+
outputStream.writeByte((byte) 0); // no response headers;
49+
outputStream.writeStringArray(new String[] { "x-pack" }); // one feature
50+
outputStream.writeString("internal:tcp/handshake");
51+
outputStream.writeByte((byte) 0); // no parent task ID;
52+
53+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
54+
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
55+
outputStream.writeByte((byte) 4); // payload length
56+
outputStream.writeVInt(requestNodeTransportVersionId);
57+
58+
handshakeRequestBytes = outputStream.bytes().toBytesRef();
59+
}
60+
61+
final BytesRef handshakeResponseBytes;
62+
try (var socket = openTransportConnection()) {
63+
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
64+
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
65+
streamOutput.writeInt(handshakeRequestBytes.length);
66+
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
67+
streamOutput.flush();
68+
69+
var streamInput = new InputStreamStreamInput(socket.getInputStream());
70+
assertEquals((byte) 'E', streamInput.readByte());
71+
assertEquals((byte) 'S', streamInput.readByte());
72+
var responseLength = streamInput.readInt();
73+
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
74+
}
75+
76+
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
77+
assertEquals(requestId, inputStream.readLong());
78+
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
79+
assertEquals(TransportHandshaker.V7_HANDSHAKE_VERSION.id(), inputStream.readInt());
80+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
81+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
82+
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
83+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
84+
assertEquals(-1, inputStream.read());
85+
}
86+
}
87+
88+
public void testV8Handshake() throws Exception {
89+
final BytesRef handshakeRequestBytes;
90+
final var requestId = randomNonNegativeLong();
91+
try (var outputStream = new BytesStreamOutput()) {
92+
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
93+
outputStream.writeLong(requestId);
94+
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
95+
outputStream.writeInt(TransportHandshaker.V8_HANDSHAKE_VERSION.id());
96+
outputStream.writeInt(0x1a); // length of variable-length header, always 0x1a
97+
outputStream.writeByte((byte) 0); // no request headers;
98+
outputStream.writeByte((byte) 0); // no response headers;
99+
outputStream.writeByte((byte) 0); // no features;
100+
outputStream.writeString("internal:tcp/handshake");
101+
outputStream.writeByte((byte) 0); // no parent task ID;
102+
103+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
104+
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
105+
outputStream.writeByte((byte) 4); // payload length
106+
outputStream.writeVInt(requestNodeTransportVersionId);
107+
108+
handshakeRequestBytes = outputStream.bytes().toBytesRef();
109+
}
110+
111+
final BytesRef handshakeResponseBytes;
112+
try (var socket = openTransportConnection()) {
113+
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
114+
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
115+
streamOutput.writeInt(handshakeRequestBytes.length);
116+
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
117+
streamOutput.flush();
118+
119+
var streamInput = new InputStreamStreamInput(socket.getInputStream());
120+
assertEquals((byte) 'E', streamInput.readByte());
121+
assertEquals((byte) 'S', streamInput.readByte());
122+
var responseLength = streamInput.readInt();
123+
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
124+
}
125+
126+
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
127+
assertEquals(requestId, inputStream.readLong());
128+
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
129+
assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt());
130+
assertEquals(2, inputStream.readInt()); // length of variable-length header, always 0x02
131+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
132+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
133+
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
134+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
135+
assertEquals(-1, inputStream.read());
136+
}
137+
}
138+
139+
public void testV9Handshake() throws Exception {
140+
final BytesRef handshakeRequestBytes;
141+
final var requestId = randomNonNegativeLong();
142+
try (var outputStream = new BytesStreamOutput()) {
143+
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
144+
outputStream.writeLong(requestId);
145+
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
146+
outputStream.writeInt(TransportHandshaker.V9_HANDSHAKE_VERSION.id());
147+
outputStream.writeInt(0x19); // length of variable-length header, always 0x19
148+
outputStream.writeByte((byte) 0); // no request headers;
149+
outputStream.writeByte((byte) 0); // no response headers;
150+
outputStream.writeString("internal:tcp/handshake");
151+
outputStream.writeByte((byte) 0); // no parent task ID;
152+
153+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
154+
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
155+
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
156+
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
157+
outputStream.writeByte((byte) (4 + 1 + releaseVersionLength)); // payload length
158+
outputStream.writeVInt(requestNodeTransportVersionId);
159+
outputStream.writeString(requestNodeReleaseVersion);
160+
161+
handshakeRequestBytes = outputStream.bytes().toBytesRef();
162+
}
163+
164+
final BytesRef handshakeResponseBytes;
165+
try (var socket = openTransportConnection()) {
166+
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
167+
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
168+
streamOutput.writeInt(handshakeRequestBytes.length);
169+
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
170+
streamOutput.flush();
171+
172+
var streamInput = new InputStreamStreamInput(socket.getInputStream());
173+
assertEquals((byte) 'E', streamInput.readByte());
174+
assertEquals((byte) 'S', streamInput.readByte());
175+
var responseLength = streamInput.readInt();
176+
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
177+
}
178+
179+
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
180+
assertEquals(requestId, inputStream.readLong());
181+
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
182+
assertEquals(TransportHandshaker.V9_HANDSHAKE_VERSION.id(), inputStream.readInt());
183+
assertEquals(2, inputStream.readInt()); // length of variable-length header, always 0x02
184+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
185+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
186+
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
187+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
188+
assertEquals(Build.current().version(), inputStream.readString());
189+
assertEquals(-1, inputStream.read());
190+
}
191+
}
192+
193+
public void testOutboundHandshake() throws Exception {
194+
final BytesRef handshakeRequestBytes;
195+
196+
try (var serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())) {
197+
getInstanceFromNode(TransportService.class).openConnection(
198+
DiscoveryNodeUtils.builder(randomIdentifier())
199+
.address(new TransportAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort()))
200+
.build(),
201+
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null, null, null, null),
202+
ActionListener.noop()
203+
);
204+
205+
try (
206+
var acceptedSocket = serverSocket.accept();
207+
var streamInput = new InputStreamStreamInput(acceptedSocket.getInputStream())
208+
) {
209+
assertEquals((byte) 'E', streamInput.readByte());
210+
assertEquals((byte) 'S', streamInput.readByte());
211+
var responseLength = streamInput.readInt();
212+
handshakeRequestBytes = streamInput.readBytesRef(responseLength);
213+
}
214+
}
215+
216+
final BytesRef payloadBytes;
217+
218+
try (var inputStream = new BytesArray(handshakeRequestBytes).streamInput()) {
219+
assertThat(inputStream.readLong(), greaterThan(0L));
220+
assertEquals(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
221+
assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt());
222+
assertEquals(0x1a, inputStream.readInt()); // length of variable-length header, always 0x1a
223+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
224+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
225+
assertEquals((byte) 0, inputStream.readByte()); // no features
226+
assertEquals("internal:tcp/handshake", inputStream.readString());
227+
assertEquals((byte) 0, inputStream.readByte()); // no parent task
228+
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
229+
payloadBytes = inputStream.readBytesRef();
230+
assertEquals(-1, inputStream.read());
231+
}
232+
233+
try (var inputStream = new BytesArray(payloadBytes).streamInput()) {
234+
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
235+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
236+
assertEquals(-1, inputStream.read());
237+
}
238+
}
239+
240+
private Socket openTransportConnection() throws Exception {
241+
final var transportAddress = randomFrom(getInstanceFromNode(TransportService.class).boundAddress().boundAddresses()).address();
242+
return AccessController.doPrivileged(
243+
(PrivilegedExceptionAction<Socket>) (() -> new Socket(transportAddress.getAddress(), transportAddress.getPort()))
244+
);
245+
}
246+
}

0 commit comments

Comments
 (0)