Skip to content

Commit 60f78e4

Browse files
authored
Test & document raw transport handshakes (#120785)
Updates the `TransportHandshaker` code comments to reflect the new handshake format introduced in #120744, and adds some low-level tests to verify the bytes on the wire are as described in those docs.
1 parent ca83ae5 commit 60f78e4

File tree

3 files changed

+289
-3
lines changed

3 files changed

+289
-3
lines changed

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ final class TransportHandshaker {
5353
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
5454
* true.
5555
*
56-
* Here are some example messages, broken down to show their structure:
56+
* Here are some example messages, broken down to show their structure. See TransportHandshakerRawMessageTests for supporting tests.
5757
*
5858
* ## v6080099 Request:
5959
*
@@ -87,7 +87,7 @@ final class TransportHandshaker {
8787
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
8888
*
8989
*
90-
* ## v7170099 and v8800000 Requests:
90+
* ## v7170099 Requests:
9191
*
9292
* 45 53 -- 'ES' marker
9393
* 00 00 00 31 -- total message length
@@ -106,7 +106,7 @@ final class TransportHandshaker {
106106
* 04 -- payload length
107107
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
108108
*
109-
* ## v7170099 and v8800000 Responses:
109+
* ## v7170099 Responses:
110110
*
111111
* 45 53 -- 'ES' marker
112112
* 00 00 00 17 -- total message length
@@ -118,6 +118,40 @@ final class TransportHandshaker {
118118
* 00 -- no response headers [1]
119119
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
120120
*
121+
* ## v8800000 Requests:
122+
*
123+
* 45 53 -- 'ES' marker
124+
* 00 00 00 36 -- total message length
125+
* 00 00 00 00 00 00 00 01 -- request ID
126+
* 08 -- status flags (0b1000 == handshake request)
127+
* 00 86 47 00 -- handshake protocol version (0x6d6833 == 7170099)
128+
* 00 00 00 19 -- length of variable portion of header
129+
* 00 -- no request headers [1]
130+
* 00 -- no response headers [1]
131+
* 16 -- action string size
132+
* 69 6e 74 65 72 6e 61 6c }
133+
* 3a 74 63 70 2f 68 61 6e }- ASCII representation of HANDSHAKE_ACTION_NAME
134+
* 64 73 68 61 6b 65 }
135+
* 00 -- no parent task ID [3]
136+
* 0a -- payload length
137+
* e8 8f 9b 04 -- requesting node transport version (vInt: 00000100 10011011 10001111 11101000 == 8833000)
138+
* 05 -- requesting node release version string length
139+
* 39 2e 30 2e 30 -- requesting node release version string "9.0.0"
140+
*
141+
* ## v8800000 Responses:
142+
*
143+
* 45 53 -- 'ES' marker
144+
* 00 00 00 1d -- total message length
145+
* 00 00 00 00 00 00 00 01 -- request ID (copied from request)
146+
* 09 -- status flags (0b1001 == handshake response)
147+
* 00 86 47 00 -- handshake protocol version (0x864700 == 8800000, copied from request)
148+
* 00 00 00 02 -- length of following variable portion of header
149+
* 00 -- no request headers [1]
150+
* 00 -- no response headers [1]
151+
* e8 8f 9b 04 -- responding node transport version (vInt: 00000100 10011011 10001111 11101000 == 8833000)
152+
* 05 -- responding node release version string length
153+
* 39 2e 30 2e 30 -- responding node release version string "9.0.0"
154+
*
121155
* [1] Thread context headers should be empty; see org.elasticsearch.common.util.concurrent.ThreadContext.ThreadContextStruct.writeTo
122156
* for their structure.
123157
* [2] A list of strings, which can safely be ignored

server/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ grant codeBase "${codebase.netty-transport}" {
127127

128128
grant {
129129
permission java.net.SocketPermission "127.0.0.1", "accept, connect, resolve";
130+
permission java.net.SocketPermission "[0:0:0:0:0:0:0:1]", "accept, connect, resolve";
130131
permission java.nio.file.LinkPermission "symbolic";
131132
// needed for keystore tests
132133
permission java.lang.RuntimePermission "accessUserInformation";
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.Build;
14+
import org.elasticsearch.TransportVersion;
15+
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
17+
import org.elasticsearch.common.bytes.BytesArray;
18+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
19+
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
20+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
21+
import org.elasticsearch.common.transport.TransportAddress;
22+
import org.elasticsearch.core.UpdateForV10;
23+
import org.elasticsearch.core.UpdateForV9;
24+
import org.elasticsearch.test.ESSingleNodeTestCase;
25+
import org.elasticsearch.test.TransportVersionUtils;
26+
27+
import java.net.InetAddress;
28+
import java.net.ServerSocket;
29+
import java.net.Socket;
30+
import java.nio.charset.StandardCharsets;
31+
import java.security.AccessController;
32+
import java.security.PrivilegedExceptionAction;
33+
34+
import static org.hamcrest.Matchers.allOf;
35+
import static org.hamcrest.Matchers.greaterThan;
36+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
37+
import static org.hamcrest.Matchers.lessThan;
38+
39+
public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
40+
41+
@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // remove support for v7 handshakes in v9
42+
public void testV7Handshake() throws Exception {
43+
final BytesRef handshakeRequestBytes;
44+
final var requestId = randomNonNegativeLong();
45+
try (var outputStream = new BytesStreamOutput()) {
46+
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
47+
outputStream.writeLong(requestId);
48+
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
49+
outputStream.writeInt(TransportHandshaker.V7_HANDSHAKE_VERSION.id());
50+
outputStream.writeByte((byte) 0); // no request headers;
51+
outputStream.writeByte((byte) 0); // no response headers;
52+
outputStream.writeStringArray(new String[] { "x-pack" }); // one feature
53+
outputStream.writeString("internal:tcp/handshake");
54+
outputStream.writeByte((byte) 0); // no parent task ID;
55+
56+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
57+
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
58+
outputStream.writeByte((byte) 4); // payload length
59+
outputStream.writeVInt(requestNodeTransportVersionId);
60+
61+
handshakeRequestBytes = outputStream.bytes().toBytesRef();
62+
}
63+
64+
final BytesRef handshakeResponseBytes;
65+
try (var socket = openTransportConnection()) {
66+
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
67+
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
68+
streamOutput.writeInt(handshakeRequestBytes.length);
69+
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
70+
streamOutput.flush();
71+
72+
var streamInput = new InputStreamStreamInput(socket.getInputStream());
73+
assertEquals((byte) 'E', streamInput.readByte());
74+
assertEquals((byte) 'S', streamInput.readByte());
75+
var responseLength = streamInput.readInt();
76+
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
77+
}
78+
79+
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
80+
assertEquals(requestId, inputStream.readLong());
81+
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
82+
assertEquals(TransportHandshaker.V7_HANDSHAKE_VERSION.id(), inputStream.readInt());
83+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
84+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
85+
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
86+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
87+
assertEquals(-1, inputStream.read());
88+
}
89+
}
90+
91+
@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // remove support for v8 handshakes in v10
92+
public void testV8Handshake() throws Exception {
93+
final BytesRef handshakeRequestBytes;
94+
final var requestId = randomNonNegativeLong();
95+
try (var outputStream = new BytesStreamOutput()) {
96+
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
97+
outputStream.writeLong(requestId);
98+
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
99+
outputStream.writeInt(TransportHandshaker.V8_HANDSHAKE_VERSION.id());
100+
outputStream.writeInt(0x1a); // length of variable-length header, always 0x1a
101+
outputStream.writeByte((byte) 0); // no request headers;
102+
outputStream.writeByte((byte) 0); // no response headers;
103+
outputStream.writeByte((byte) 0); // no features;
104+
outputStream.writeString("internal:tcp/handshake");
105+
outputStream.writeByte((byte) 0); // no parent task ID;
106+
107+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
108+
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
109+
outputStream.writeByte((byte) 4); // payload length
110+
outputStream.writeVInt(requestNodeTransportVersionId);
111+
112+
handshakeRequestBytes = outputStream.bytes().toBytesRef();
113+
}
114+
115+
final BytesRef handshakeResponseBytes;
116+
try (var socket = openTransportConnection()) {
117+
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
118+
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
119+
streamOutput.writeInt(handshakeRequestBytes.length);
120+
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
121+
streamOutput.flush();
122+
123+
var streamInput = new InputStreamStreamInput(socket.getInputStream());
124+
assertEquals((byte) 'E', streamInput.readByte());
125+
assertEquals((byte) 'S', streamInput.readByte());
126+
var responseLength = streamInput.readInt();
127+
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
128+
}
129+
130+
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
131+
assertEquals(requestId, inputStream.readLong());
132+
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
133+
assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt());
134+
assertEquals(2, inputStream.readInt()); // length of variable-length header, always 0x02
135+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
136+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
137+
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
138+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
139+
assertEquals(-1, inputStream.read());
140+
}
141+
}
142+
143+
@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // remove support for v9 handshakes in v11
144+
public void testV9Handshake() throws Exception {
145+
final BytesRef handshakeRequestBytes;
146+
final var requestId = randomNonNegativeLong();
147+
try (var outputStream = new BytesStreamOutput()) {
148+
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
149+
outputStream.writeLong(requestId);
150+
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
151+
outputStream.writeInt(TransportHandshaker.V9_HANDSHAKE_VERSION.id());
152+
outputStream.writeInt(0x19); // length of variable-length header, always 0x19
153+
outputStream.writeByte((byte) 0); // no request headers;
154+
outputStream.writeByte((byte) 0); // no response headers;
155+
outputStream.writeString("internal:tcp/handshake");
156+
outputStream.writeByte((byte) 0); // no parent task ID;
157+
158+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
159+
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
160+
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
161+
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
162+
outputStream.writeByte((byte) (4 + 1 + releaseVersionLength)); // payload length
163+
outputStream.writeVInt(requestNodeTransportVersionId);
164+
outputStream.writeString(requestNodeReleaseVersion);
165+
166+
handshakeRequestBytes = outputStream.bytes().toBytesRef();
167+
}
168+
169+
final BytesRef handshakeResponseBytes;
170+
try (var socket = openTransportConnection()) {
171+
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
172+
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
173+
streamOutput.writeInt(handshakeRequestBytes.length);
174+
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
175+
streamOutput.flush();
176+
177+
var streamInput = new InputStreamStreamInput(socket.getInputStream());
178+
assertEquals((byte) 'E', streamInput.readByte());
179+
assertEquals((byte) 'S', streamInput.readByte());
180+
var responseLength = streamInput.readInt();
181+
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
182+
}
183+
184+
try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
185+
assertEquals(requestId, inputStream.readLong());
186+
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
187+
assertEquals(TransportHandshaker.V9_HANDSHAKE_VERSION.id(), inputStream.readInt());
188+
assertEquals(2, inputStream.readInt()); // length of variable-length header, always 0x02
189+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
190+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
191+
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
192+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
193+
assertEquals(Build.current().version(), inputStream.readString());
194+
assertEquals(-1, inputStream.read());
195+
}
196+
}
197+
198+
public void testOutboundHandshake() throws Exception {
199+
final BytesRef handshakeRequestBytes;
200+
201+
try (var serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())) {
202+
getInstanceFromNode(TransportService.class).openConnection(
203+
DiscoveryNodeUtils.builder(randomIdentifier())
204+
.address(new TransportAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort()))
205+
.build(),
206+
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null, null, null, null),
207+
ActionListener.noop()
208+
);
209+
210+
try (
211+
var acceptedSocket = serverSocket.accept();
212+
var streamInput = new InputStreamStreamInput(acceptedSocket.getInputStream())
213+
) {
214+
assertEquals((byte) 'E', streamInput.readByte());
215+
assertEquals((byte) 'S', streamInput.readByte());
216+
var responseLength = streamInput.readInt();
217+
handshakeRequestBytes = streamInput.readBytesRef(responseLength);
218+
}
219+
}
220+
221+
final BytesRef payloadBytes;
222+
223+
try (var inputStream = new BytesArray(handshakeRequestBytes).streamInput()) {
224+
assertThat(inputStream.readLong(), greaterThan(0L));
225+
assertEquals(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
226+
assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt());
227+
assertEquals(0x1a, inputStream.readInt()); // length of variable-length header, always 0x1a
228+
assertEquals((byte) 0, inputStream.readByte()); // no request headers
229+
assertEquals((byte) 0, inputStream.readByte()); // no response headers
230+
assertEquals((byte) 0, inputStream.readByte()); // no features
231+
assertEquals("internal:tcp/handshake", inputStream.readString());
232+
assertEquals((byte) 0, inputStream.readByte()); // no parent task
233+
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
234+
payloadBytes = inputStream.readBytesRef();
235+
assertEquals(-1, inputStream.read());
236+
}
237+
238+
try (var inputStream = new BytesArray(payloadBytes).streamInput()) {
239+
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
240+
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
241+
assertEquals(-1, inputStream.read());
242+
}
243+
}
244+
245+
private Socket openTransportConnection() throws Exception {
246+
final var transportAddress = randomFrom(getInstanceFromNode(TransportService.class).boundAddress().boundAddresses()).address();
247+
return AccessController.doPrivileged(
248+
(PrivilegedExceptionAction<Socket>) (() -> new Socket(transportAddress.getAddress(), transportAddress.getPort()))
249+
);
250+
}
251+
}

0 commit comments

Comments
 (0)