1111
1212import org .elasticsearch .Build ;
1313import org .elasticsearch .TransportVersion ;
14- import org .elasticsearch .TransportVersions ;
1514import org .elasticsearch .action .ActionListener ;
1615import org .elasticsearch .cluster .node .DiscoveryNode ;
1716import org .elasticsearch .common .bytes .BytesReference ;
1817import org .elasticsearch .common .io .stream .BytesStreamOutput ;
1918import org .elasticsearch .common .io .stream .StreamInput ;
2019import org .elasticsearch .common .io .stream .StreamOutput ;
2120import org .elasticsearch .common .metrics .CounterMetric ;
21+ import org .elasticsearch .core .Strings ;
2222import org .elasticsearch .core .TimeValue ;
23+ import org .elasticsearch .logging .LogManager ;
24+ import org .elasticsearch .logging .Logger ;
2325import org .elasticsearch .threadpool .ThreadPool ;
2426
2527import java .io .EOFException ;
@@ -157,6 +159,8 @@ final class TransportHandshaker {
157159 * [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
158160 */
159161
162+ private static final Logger logger = LogManager .getLogger (TransportHandshaker .class );
163+
160164 static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion .fromId (6_08_00_99 );
161165 static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion .fromId (7_17_00_99 );
162166 static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion .fromId (8_800_00_0 );
@@ -195,7 +199,7 @@ void sendHandshake(
195199 ActionListener <TransportVersion > listener
196200 ) {
197201 numHandshakes .inc ();
198- final HandshakeResponseHandler handler = new HandshakeResponseHandler (requestId , listener );
202+ final HandshakeResponseHandler handler = new HandshakeResponseHandler (requestId , channel , listener );
199203 pendingHandshakes .put (requestId , handler );
200204 channel .addCloseListener (
201205 ActionListener .running (() -> handler .handleLocalException (new TransportException ("handshake failed because connection reset" )))
@@ -221,9 +225,9 @@ void sendHandshake(
221225 }
222226
223227 void handleHandshake (TransportChannel channel , long requestId , StreamInput stream ) throws IOException {
228+ final HandshakeRequest handshakeRequest ;
224229 try {
225- // Must read the handshake request to exhaust the stream
226- new HandshakeRequest (stream );
230+ handshakeRequest = new HandshakeRequest (stream );
227231 } catch (Exception e ) {
228232 assert ignoreDeserializationErrors : e ;
229233 throw e ;
@@ -242,9 +246,44 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
242246 assert ignoreDeserializationErrors : exception ;
243247 throw exception ;
244248 }
249+ ensureCompatibleVersion (version , handshakeRequest .transportVersion , handshakeRequest .releaseVersion , channel );
245250 channel .sendResponse (new HandshakeResponse (this .version , Build .current ().version ()));
246251 }
247252
253+ static void ensureCompatibleVersion (
254+ TransportVersion localTransportVersion ,
255+ TransportVersion remoteTransportVersion ,
256+ String releaseVersion ,
257+ Object channel
258+ ) {
259+ if (TransportVersion .isCompatible (remoteTransportVersion )) {
260+ if (remoteTransportVersion .onOrAfter (localTransportVersion )) {
261+ // Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
262+ // knows how to do that.
263+ return ;
264+ }
265+ if (remoteTransportVersion .isKnown ()) {
266+ // Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
267+ // version is known to us.
268+ return ;
269+ }
270+ }
271+
272+ final var message = Strings .format (
273+ """
274+ Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
275+ version [%s/%s] which has an incompatible wire format.""" ,
276+ releaseVersion ,
277+ remoteTransportVersion ,
278+ channel ,
279+ Build .current ().version (),
280+ localTransportVersion
281+ );
282+ logger .warn (message );
283+ throw new IllegalStateException (message );
284+
285+ }
286+
248287 TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
249288 return pendingHandshakes .remove (requestId );
250289 }
@@ -260,11 +299,13 @@ long getNumHandshakes() {
260299 private class HandshakeResponseHandler implements TransportResponseHandler <HandshakeResponse > {
261300
262301 private final long requestId ;
302+ private final TcpChannel channel ;
263303 private final ActionListener <TransportVersion > listener ;
264304 private final AtomicBoolean isDone = new AtomicBoolean (false );
265305
266- private HandshakeResponseHandler (long requestId , ActionListener <TransportVersion > listener ) {
306+ private HandshakeResponseHandler (long requestId , TcpChannel channel , ActionListener <TransportVersion > listener ) {
267307 this .requestId = requestId ;
308+ this .channel = channel ;
268309 this .listener = listener ;
269310 }
270311
@@ -281,20 +322,13 @@ public Executor executor() {
281322 @ Override
282323 public void handleResponse (HandshakeResponse response ) {
283324 if (isDone .compareAndSet (false , true )) {
284- TransportVersion responseVersion = response .transportVersion ;
285- if (TransportVersion .isCompatible (responseVersion ) == false ) {
286- listener .onFailure (
287- new IllegalStateException (
288- "Received message from unsupported version: ["
289- + responseVersion
290- + "] minimal compatible version is: ["
291- + TransportVersions .MINIMUM_COMPATIBLE
292- + "]"
293- )
294- );
295- } else {
296- listener .onResponse (TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ()));
297- }
325+ ActionListener .completeWith (listener , () -> {
326+ ensureCompatibleVersion (version , response .getTransportVersion (), response .getReleaseVersion (), channel );
327+ final var resultVersion = TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ());
328+ assert TransportVersion .current ().before (version ) // simulating a newer-version transport service for test purposes
329+ || resultVersion .isKnown () : "negotiated unknown version " + resultVersion ;
330+ return resultVersion ;
331+ });
298332 }
299333 }
300334
0 commit comments