99
1010package org .elasticsearch .transport ;
1111
12+ import org .elasticsearch .Build ;
1213import org .elasticsearch .TransportVersion ;
1314import org .elasticsearch .TransportVersions ;
1415import org .elasticsearch .action .ActionListener ;
1920import org .elasticsearch .common .io .stream .StreamOutput ;
2021import org .elasticsearch .common .metrics .CounterMetric ;
2122import org .elasticsearch .core .TimeValue ;
23+ import org .elasticsearch .core .UpdateForV9 ;
2224import org .elasticsearch .threadpool .ThreadPool ;
2325
2426import java .io .EOFException ;
2527import java .io .IOException ;
28+ import java .util .Objects ;
2629import java .util .Set ;
2730import java .util .concurrent .ConcurrentHashMap ;
2831import java .util .concurrent .ConcurrentMap ;
@@ -206,7 +209,7 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
206209 assert ignoreDeserializationErrors : exception ;
207210 throw exception ;
208211 }
209- channel .sendResponse (new HandshakeResponse (this .version ));
212+ channel .sendResponse (new HandshakeResponse (this .version , Build . current (). version () ));
210213 }
211214
212215 TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
@@ -245,7 +248,7 @@ public Executor executor() {
245248 @ Override
246249 public void handleResponse (HandshakeResponse response ) {
247250 if (isDone .compareAndSet (false , true )) {
248- TransportVersion responseVersion = response .responseVersion ;
251+ TransportVersion responseVersion = response .transportVersion ;
249252 if (TransportVersion .isCompatible (responseVersion ) == false ) {
250253 listener .onFailure (
251254 new IllegalStateException (
@@ -257,7 +260,7 @@ public void handleResponse(HandshakeResponse response) {
257260 )
258261 );
259262 } else {
260- listener .onResponse (TransportVersion .min (TransportHandshaker .this .version , response .getResponseVersion ()));
263+ listener .onResponse (TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ()));
261264 }
262265 }
263266 }
@@ -278,12 +281,23 @@ void handleLocalException(TransportException e) {
278281
279282 static final class HandshakeRequest extends TransportRequest {
280283
281- private final TransportVersion version ;
284+ /**
285+ * The {@link TransportVersion#current()} of the requesting node.
286+ */
287+ final TransportVersion transportVersion ;
282288
283- HandshakeRequest (TransportVersion version ) {
284- this .version = version ;
289+ /**
290+ * The {@link Build#version()} of the requesting node, as a {@link String}, for better reporting of handshake failures due to
291+ * an incompatible version.
292+ */
293+ final String releaseVersion ;
294+
295+ HandshakeRequest (TransportVersion transportVersion , String releaseVersion ) {
296+ this .transportVersion = Objects .requireNonNull (transportVersion );
297+ this .releaseVersion = Objects .requireNonNull (releaseVersion );
285298 }
286299
300+ @ UpdateForV9 (owner = UpdateForV9 .Owner .CORE_INFRA ) // remainingMessage == null is invalid in v9
287301 HandshakeRequest (StreamInput streamInput ) throws IOException {
288302 super (streamInput );
289303 BytesReference remainingMessage ;
@@ -293,53 +307,101 @@ static final class HandshakeRequest extends TransportRequest {
293307 remainingMessage = null ;
294308 }
295309 if (remainingMessage == null ) {
296- version = null ;
310+ transportVersion = null ;
311+ releaseVersion = null ;
297312 } else {
298313 try (StreamInput messageStreamInput = remainingMessage .streamInput ()) {
299- this .version = TransportVersion .readVersion (messageStreamInput );
314+ this .transportVersion = TransportVersion .readVersion (messageStreamInput );
315+ if (streamInput .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
316+ this .releaseVersion = messageStreamInput .readString ();
317+ } else {
318+ this .releaseVersion = this .transportVersion .toReleaseVersion ();
319+ }
300320 }
301321 }
302322 }
303323
304324 @ Override
305325 public void writeTo (StreamOutput streamOutput ) throws IOException {
306326 super .writeTo (streamOutput );
307- assert version != null ;
308- try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput (4 )) {
309- TransportVersion .writeVersion (version , messageStreamOutput );
327+ assert transportVersion != null ;
328+ try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput (1024 )) {
329+ TransportVersion .writeVersion (transportVersion , messageStreamOutput );
330+ if (streamOutput .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
331+ messageStreamOutput .writeString (releaseVersion );
332+ } // else we just send the transport version and rely on a best-effort mapping to release versions
310333 BytesReference reference = messageStreamOutput .bytes ();
311334 streamOutput .writeBytesReference (reference );
312335 }
313336 }
314337 }
315338
339+ /**
340+ * A response to a low-level transport handshake, carrying information about the version of the responding node.
341+ */
316342 static final class HandshakeResponse extends TransportResponse {
317343
318- private final TransportVersion responseVersion ;
344+ /**
345+ * The {@link TransportVersion#current()} of the responding node.
346+ */
347+ private final TransportVersion transportVersion ;
319348
320- HandshakeResponse (TransportVersion responseVersion ) {
321- this .responseVersion = responseVersion ;
349+ /**
350+ * The {@link Build#version()} of the responding node, as a {@link String}, for better reporting of handshake failures due to
351+ * an incompatible version.
352+ */
353+ private final String releaseVersion ;
354+
355+ HandshakeResponse (TransportVersion transportVersion , String releaseVersion ) {
356+ this .transportVersion = Objects .requireNonNull (transportVersion );
357+ this .releaseVersion = Objects .requireNonNull (releaseVersion );
322358 }
323359
324- private HandshakeResponse (StreamInput in ) throws IOException {
360+ HandshakeResponse (StreamInput in ) throws IOException {
325361 super (in );
326- responseVersion = TransportVersion .readVersion (in );
362+ transportVersion = TransportVersion .readVersion (in );
363+ if (in .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
364+ releaseVersion = in .readString ();
365+ } else {
366+ releaseVersion = transportVersion .toReleaseVersion ();
367+ }
327368 }
328369
329370 @ Override
330371 public void writeTo (StreamOutput out ) throws IOException {
331- assert responseVersion != null ;
332- TransportVersion .writeVersion (responseVersion , out );
372+ TransportVersion .writeVersion (transportVersion , out );
373+ if (out .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
374+ out .writeString (releaseVersion );
375+ } // else we just send the transport version and rely on a best-effort mapping to release versions
376+ }
377+
378+ /**
379+ * @return the {@link TransportVersion#current()} of the responding node.
380+ */
381+ TransportVersion getTransportVersion () {
382+ return transportVersion ;
333383 }
334384
335- TransportVersion getResponseVersion () {
336- return responseVersion ;
385+ /**
386+ * @return the {@link Build#version()} of the responding node, as a {@link String}, for better reporting of handshake failures due
387+ * to an incompatible version.
388+ */
389+ String getReleaseVersion () {
390+ return releaseVersion ;
337391 }
338392 }
339393
340394 @ FunctionalInterface
341395 interface HandshakeRequestSender {
342-
343- void sendRequest (DiscoveryNode node , TcpChannel channel , long requestId , TransportVersion version ) throws IOException ;
396+ /**
397+ * @param node The (expected) remote node, for error reporting and passing to
398+ * {@link TransportMessageListener#onRequestSent}.
399+ * @param channel The TCP channel to use to send the handshake request.
400+ * @param requestId The transport request ID, for matching up the response.
401+ * @param handshakeTransportVersion The {@link TransportVersion} to use for the handshake request, which will be
402+ * {@link TransportHandshaker#V8_HANDSHAKE_VERSION} in production.
403+ */
404+ void sendRequest (DiscoveryNode node , TcpChannel channel , long requestId , TransportVersion handshakeTransportVersion )
405+ throws IOException ;
344406 }
345407}
0 commit comments