Skip to content

Commit 3e71bb5

Browse files
authored
[8.x] Add version string to v9 transport handshake (#120744) (#120779)
* Add version string to v9 transport handshake (#120744) * Remove UpdateForV9 annotations
1 parent dec96ba commit 3e71bb5

File tree

3 files changed

+210
-28
lines changed

3 files changed

+210
-28
lines changed

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.apache.lucene.util.BytesRef;
15+
import org.elasticsearch.Build;
1516
import org.elasticsearch.ElasticsearchException;
1617
import org.elasticsearch.TransportVersion;
1718
import org.elasticsearch.action.ActionListener;
@@ -175,7 +176,7 @@ public TcpTransport(
175176
channel,
176177
requestId,
177178
TransportHandshaker.HANDSHAKE_ACTION_NAME,
178-
new TransportHandshaker.HandshakeRequest(version),
179+
new TransportHandshaker.HandshakeRequest(version, Build.current().version()),
179180
TransportRequestOptions.EMPTY,
180181
v,
181182
null,

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.transport;
1111

12+
import org.elasticsearch.Build;
1213
import org.elasticsearch.TransportVersion;
1314
import org.elasticsearch.TransportVersions;
1415
import org.elasticsearch.action.ActionListener;
@@ -23,6 +24,7 @@
2324

2425
import java.io.EOFException;
2526
import java.io.IOException;
27+
import java.util.Objects;
2628
import java.util.Set;
2729
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.concurrent.ConcurrentMap;
@@ -206,7 +208,7 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
206208
assert ignoreDeserializationErrors : exception;
207209
throw exception;
208210
}
209-
channel.sendResponse(new HandshakeResponse(this.version));
211+
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
210212
}
211213

212214
TransportResponseHandler<HandshakeResponse> removeHandlerForHandshake(long requestId) {
@@ -245,7 +247,7 @@ public Executor executor() {
245247
@Override
246248
public void handleResponse(HandshakeResponse response) {
247249
if (isDone.compareAndSet(false, true)) {
248-
TransportVersion responseVersion = response.responseVersion;
250+
TransportVersion responseVersion = response.transportVersion;
249251
if (TransportVersion.isCompatible(responseVersion) == false) {
250252
listener.onFailure(
251253
new IllegalStateException(
@@ -257,7 +259,7 @@ public void handleResponse(HandshakeResponse response) {
257259
)
258260
);
259261
} else {
260-
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getResponseVersion()));
262+
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()));
261263
}
262264
}
263265
}
@@ -278,10 +280,20 @@ void handleLocalException(TransportException e) {
278280

279281
static final class HandshakeRequest extends TransportRequest {
280282

281-
private final TransportVersion version;
283+
/**
284+
* The {@link TransportVersion#current()} of the requesting node.
285+
*/
286+
final TransportVersion transportVersion;
282287

283-
HandshakeRequest(TransportVersion version) {
284-
this.version = version;
288+
/**
289+
* The {@link Build#version()} of the requesting node, as a {@link String}, for better reporting of handshake failures due to
290+
* an incompatible version.
291+
*/
292+
final String releaseVersion;
293+
294+
HandshakeRequest(TransportVersion transportVersion, String releaseVersion) {
295+
this.transportVersion = Objects.requireNonNull(transportVersion);
296+
this.releaseVersion = Objects.requireNonNull(releaseVersion);
285297
}
286298

287299
HandshakeRequest(StreamInput streamInput) throws IOException {
@@ -293,53 +305,101 @@ static final class HandshakeRequest extends TransportRequest {
293305
remainingMessage = null;
294306
}
295307
if (remainingMessage == null) {
296-
version = null;
308+
transportVersion = null;
309+
releaseVersion = null;
297310
} else {
298311
try (StreamInput messageStreamInput = remainingMessage.streamInput()) {
299-
this.version = TransportVersion.readVersion(messageStreamInput);
312+
this.transportVersion = TransportVersion.readVersion(messageStreamInput);
313+
if (streamInput.getTransportVersion().onOrAfter(V9_HANDSHAKE_VERSION)) {
314+
this.releaseVersion = messageStreamInput.readString();
315+
} else {
316+
this.releaseVersion = this.transportVersion.toReleaseVersion();
317+
}
300318
}
301319
}
302320
}
303321

304322
@Override
305323
public void writeTo(StreamOutput streamOutput) throws IOException {
306324
super.writeTo(streamOutput);
307-
assert version != null;
308-
try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput(4)) {
309-
TransportVersion.writeVersion(version, messageStreamOutput);
325+
assert transportVersion != null;
326+
try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput(1024)) {
327+
TransportVersion.writeVersion(transportVersion, messageStreamOutput);
328+
if (streamOutput.getTransportVersion().onOrAfter(V9_HANDSHAKE_VERSION)) {
329+
messageStreamOutput.writeString(releaseVersion);
330+
} // else we just send the transport version and rely on a best-effort mapping to release versions
310331
BytesReference reference = messageStreamOutput.bytes();
311332
streamOutput.writeBytesReference(reference);
312333
}
313334
}
314335
}
315336

337+
/**
338+
* A response to a low-level transport handshake, carrying information about the version of the responding node.
339+
*/
316340
static final class HandshakeResponse extends TransportResponse {
317341

318-
private final TransportVersion responseVersion;
342+
/**
343+
* The {@link TransportVersion#current()} of the responding node.
344+
*/
345+
private final TransportVersion transportVersion;
319346

320-
HandshakeResponse(TransportVersion responseVersion) {
321-
this.responseVersion = responseVersion;
347+
/**
348+
* The {@link Build#version()} of the responding node, as a {@link String}, for better reporting of handshake failures due to
349+
* an incompatible version.
350+
*/
351+
private final String releaseVersion;
352+
353+
HandshakeResponse(TransportVersion transportVersion, String releaseVersion) {
354+
this.transportVersion = Objects.requireNonNull(transportVersion);
355+
this.releaseVersion = Objects.requireNonNull(releaseVersion);
322356
}
323357

324-
private HandshakeResponse(StreamInput in) throws IOException {
358+
HandshakeResponse(StreamInput in) throws IOException {
325359
super(in);
326-
responseVersion = TransportVersion.readVersion(in);
360+
transportVersion = TransportVersion.readVersion(in);
361+
if (in.getTransportVersion().onOrAfter(V9_HANDSHAKE_VERSION)) {
362+
releaseVersion = in.readString();
363+
} else {
364+
releaseVersion = transportVersion.toReleaseVersion();
365+
}
327366
}
328367

329368
@Override
330369
public void writeTo(StreamOutput out) throws IOException {
331-
assert responseVersion != null;
332-
TransportVersion.writeVersion(responseVersion, out);
370+
TransportVersion.writeVersion(transportVersion, out);
371+
if (out.getTransportVersion().onOrAfter(V9_HANDSHAKE_VERSION)) {
372+
out.writeString(releaseVersion);
373+
} // else we just send the transport version and rely on a best-effort mapping to release versions
374+
}
375+
376+
/**
377+
* @return the {@link TransportVersion#current()} of the responding node.
378+
*/
379+
TransportVersion getTransportVersion() {
380+
return transportVersion;
333381
}
334382

335-
TransportVersion getResponseVersion() {
336-
return responseVersion;
383+
/**
384+
* @return the {@link Build#version()} of the responding node, as a {@link String}, for better reporting of handshake failures due
385+
* to an incompatible version.
386+
*/
387+
String getReleaseVersion() {
388+
return releaseVersion;
337389
}
338390
}
339391

340392
@FunctionalInterface
341393
interface HandshakeRequestSender {
342-
343-
void sendRequest(DiscoveryNode node, TcpChannel channel, long requestId, TransportVersion version) throws IOException;
394+
/**
395+
* @param node The (expected) remote node, for error reporting and passing to
396+
* {@link TransportMessageListener#onRequestSent}.
397+
* @param channel The TCP channel to use to send the handshake request.
398+
* @param requestId The transport request ID, for matching up the response.
399+
* @param handshakeTransportVersion The {@link TransportVersion} to use for the handshake request, which will be
400+
* {@link TransportHandshaker#V8_HANDSHAKE_VERSION} in production.
401+
*/
402+
void sendRequest(DiscoveryNode node, TcpChannel channel, long requestId, TransportVersion handshakeTransportVersion)
403+
throws IOException;
344404
}
345405
}

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 126 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ public void testHandshakeRequestAndResponse() throws IOException {
7171

7272
assertFalse(versionFuture.isDone());
7373

74-
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(TransportVersion.current());
74+
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
75+
TransportVersion.current(),
76+
randomIdentifier()
77+
);
7578
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
79+
bytesStreamOutput.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
7680
handshakeRequest.writeTo(bytesStreamOutput);
7781
StreamInput input = bytesStreamOutput.bytes().streamInput();
82+
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
7883
final PlainActionFuture<TransportResponse> responseFuture = new PlainActionFuture<>();
7984
final TestTransportChannel channel = new TestTransportChannel(responseFuture);
8085
handshaker.handleHandshake(channel, reqId, input);
@@ -95,7 +100,7 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
95100
assertFalse(versionFuture.isDone());
96101

97102
final var remoteVersion = TransportVersionUtils.randomCompatibleVersion(random());
98-
handler.handleResponse(new TransportHandshaker.HandshakeResponse(remoteVersion));
103+
handler.handleResponse(new TransportHandshaker.HandshakeResponse(remoteVersion, randomIdentifier()));
99104

100105
assertTrue(versionFuture.isDone());
101106
assertEquals(remoteVersion, versionFuture.result());
@@ -110,7 +115,10 @@ public void testHandshakeResponseFromNewerNode() throws Exception {
110115
assertFalse(versionFuture.isDone());
111116

112117
handler.handleResponse(
113-
new TransportHandshaker.HandshakeResponse(TransportVersion.fromId(TransportVersion.current().id() + between(0, 10)))
118+
new TransportHandshaker.HandshakeResponse(
119+
TransportVersion.fromId(TransportVersion.current().id() + between(0, 10)),
120+
randomIdentifier()
121+
)
114122
);
115123

116124
assertTrue(versionFuture.isDone());
@@ -123,8 +131,12 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException
123131

124132
verify(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION);
125133

126-
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(TransportVersion.current());
134+
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
135+
TransportVersion.current(),
136+
randomIdentifier()
137+
);
127138
BytesStreamOutput currentHandshakeBytes = new BytesStreamOutput();
139+
currentHandshakeBytes.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
128140
handshakeRequest.writeTo(currentHandshakeBytes);
129141

130142
BytesStreamOutput lengthCheckingHandshake = new BytesStreamOutput();
@@ -149,7 +161,116 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException
149161

150162
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseFuture.actionGet();
151163

152-
assertEquals(TransportVersion.current(), response.getResponseVersion());
164+
assertEquals(TransportVersion.current(), response.getTransportVersion());
165+
}
166+
167+
public void testReadV7HandshakeRequest() throws IOException {
168+
final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random());
169+
170+
final var requestPayloadStreamOutput = new BytesStreamOutput();
171+
requestPayloadStreamOutput.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
172+
requestPayloadStreamOutput.writeVInt(transportVersion.id());
173+
174+
final var requestBytesStreamOutput = new BytesStreamOutput();
175+
requestBytesStreamOutput.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
176+
TaskId.EMPTY_TASK_ID.writeTo(requestBytesStreamOutput);
177+
requestBytesStreamOutput.writeBytesReference(requestPayloadStreamOutput.bytes());
178+
179+
final var requestBytesStream = requestBytesStreamOutput.bytes().streamInput();
180+
requestBytesStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
181+
final var handshakeRequest = new TransportHandshaker.HandshakeRequest(requestBytesStream);
182+
183+
assertEquals(transportVersion, handshakeRequest.transportVersion);
184+
assertEquals(transportVersion.toReleaseVersion(), handshakeRequest.releaseVersion);
185+
}
186+
187+
public void testReadV7HandshakeResponse() throws IOException {
188+
final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random());
189+
190+
final var responseBytesStreamOutput = new BytesStreamOutput();
191+
responseBytesStreamOutput.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
192+
responseBytesStreamOutput.writeVInt(transportVersion.id());
193+
194+
final var responseBytesStream = responseBytesStreamOutput.bytes().streamInput();
195+
responseBytesStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
196+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(responseBytesStream);
197+
198+
assertEquals(transportVersion, handshakeResponse.getTransportVersion());
199+
assertEquals(transportVersion.toReleaseVersion(), handshakeResponse.getReleaseVersion());
200+
}
201+
202+
public void testReadV8HandshakeRequest() throws IOException {
203+
final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random());
204+
205+
final var requestPayloadStreamOutput = new BytesStreamOutput();
206+
requestPayloadStreamOutput.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
207+
requestPayloadStreamOutput.writeVInt(transportVersion.id());
208+
209+
final var requestBytesStreamOutput = new BytesStreamOutput();
210+
requestBytesStreamOutput.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
211+
TaskId.EMPTY_TASK_ID.writeTo(requestBytesStreamOutput);
212+
requestBytesStreamOutput.writeBytesReference(requestPayloadStreamOutput.bytes());
213+
214+
final var requestBytesStream = requestBytesStreamOutput.bytes().streamInput();
215+
requestBytesStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
216+
final var handshakeRequest = new TransportHandshaker.HandshakeRequest(requestBytesStream);
217+
218+
assertEquals(transportVersion, handshakeRequest.transportVersion);
219+
assertEquals(transportVersion.toReleaseVersion(), handshakeRequest.releaseVersion);
220+
}
221+
222+
public void testReadV8HandshakeResponse() throws IOException {
223+
final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random());
224+
225+
final var responseBytesStreamOutput = new BytesStreamOutput();
226+
responseBytesStreamOutput.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
227+
responseBytesStreamOutput.writeVInt(transportVersion.id());
228+
229+
final var responseBytesStream = responseBytesStreamOutput.bytes().streamInput();
230+
responseBytesStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
231+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(responseBytesStream);
232+
233+
assertEquals(transportVersion, handshakeResponse.getTransportVersion());
234+
assertEquals(transportVersion.toReleaseVersion(), handshakeResponse.getReleaseVersion());
235+
}
236+
237+
public void testReadV9HandshakeRequest() throws IOException {
238+
final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random());
239+
final var releaseVersion = randomIdentifier();
240+
241+
final var requestPayloadStreamOutput = new BytesStreamOutput();
242+
requestPayloadStreamOutput.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
243+
requestPayloadStreamOutput.writeVInt(transportVersion.id());
244+
requestPayloadStreamOutput.writeString(releaseVersion);
245+
246+
final var requestBytesStreamOutput = new BytesStreamOutput();
247+
requestBytesStreamOutput.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
248+
TaskId.EMPTY_TASK_ID.writeTo(requestBytesStreamOutput);
249+
requestBytesStreamOutput.writeBytesReference(requestPayloadStreamOutput.bytes());
250+
251+
final var requestBytesStream = requestBytesStreamOutput.bytes().streamInput();
252+
requestBytesStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
253+
final var handshakeRequest = new TransportHandshaker.HandshakeRequest(requestBytesStream);
254+
255+
assertEquals(transportVersion, handshakeRequest.transportVersion);
256+
assertEquals(releaseVersion, handshakeRequest.releaseVersion);
257+
}
258+
259+
public void testReadV9HandshakeResponse() throws IOException {
260+
final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random());
261+
final var releaseVersion = randomIdentifier();
262+
263+
final var responseBytesStreamOutput = new BytesStreamOutput();
264+
responseBytesStreamOutput.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
265+
responseBytesStreamOutput.writeVInt(transportVersion.id());
266+
responseBytesStreamOutput.writeString(releaseVersion);
267+
268+
final var responseBytesStream = responseBytesStreamOutput.bytes().streamInput();
269+
responseBytesStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
270+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(responseBytesStream);
271+
272+
assertEquals(transportVersion, handshakeResponse.getTransportVersion());
273+
assertEquals(releaseVersion, handshakeResponse.getReleaseVersion());
153274
}
154275

155276
public void testHandshakeError() throws IOException {

0 commit comments

Comments
 (0)