1111
1212import org .elasticsearch .Build ;
1313import org .elasticsearch .TransportVersion ;
14- import org .elasticsearch .TransportVersions ;
1514import org .elasticsearch .action .ActionListener ;
1615import org .elasticsearch .cluster .node .DiscoveryNode ;
1716import org .elasticsearch .common .bytes .BytesReference ;
1817import org .elasticsearch .common .io .stream .BytesStreamOutput ;
1918import org .elasticsearch .common .io .stream .StreamInput ;
2019import org .elasticsearch .common .io .stream .StreamOutput ;
2120import org .elasticsearch .common .metrics .CounterMetric ;
21+ import org .elasticsearch .core .Strings ;
2222import org .elasticsearch .core .TimeValue ;
2323import org .elasticsearch .core .UpdateForV9 ;
24+ import org .elasticsearch .logging .LogManager ;
25+ import org .elasticsearch .logging .Logger ;
2426import org .elasticsearch .threadpool .ThreadPool ;
2527
2628import java .io .EOFException ;
@@ -126,6 +128,8 @@ final class TransportHandshaker {
126128 * [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
127129 */
128130
131+ private static final Logger logger = LogManager .getLogger (TransportHandshaker .class );
132+
129133 static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion .fromId (7_17_00_99 );
130134 static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion .fromId (8_800_00_0 );
131135 static final Set <TransportVersion > ALLOWED_HANDSHAKE_VERSIONS = Set .of (V8_HANDSHAKE_VERSION , V9_HANDSHAKE_VERSION );
@@ -159,7 +163,7 @@ void sendHandshake(
159163 ActionListener <TransportVersion > listener
160164 ) {
161165 numHandshakes .inc ();
162- final HandshakeResponseHandler handler = new HandshakeResponseHandler (requestId , listener );
166+ final HandshakeResponseHandler handler = new HandshakeResponseHandler (requestId , channel , listener );
163167 pendingHandshakes .put (requestId , handler );
164168 channel .addCloseListener (
165169 ActionListener .running (() -> handler .handleLocalException (new TransportException ("handshake failed because connection reset" )))
@@ -185,9 +189,9 @@ void sendHandshake(
185189 }
186190
187191 void handleHandshake (TransportChannel channel , long requestId , StreamInput stream ) throws IOException {
192+ final HandshakeRequest handshakeRequest ;
188193 try {
189- // Must read the handshake request to exhaust the stream
190- new HandshakeRequest (stream );
194+ handshakeRequest = new HandshakeRequest (stream );
191195 } catch (Exception e ) {
192196 assert ignoreDeserializationErrors : e ;
193197 throw e ;
@@ -206,9 +210,44 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
206210 assert ignoreDeserializationErrors : exception ;
207211 throw exception ;
208212 }
213+ ensureCompatibleVersion (version , handshakeRequest .transportVersion , handshakeRequest .releaseVersion , channel );
209214 channel .sendResponse (new HandshakeResponse (this .version , Build .current ().version ()));
210215 }
211216
217+ static void ensureCompatibleVersion (
218+ TransportVersion localTransportVersion ,
219+ TransportVersion remoteTransportVersion ,
220+ String releaseVersion ,
221+ Object channel
222+ ) {
223+ if (TransportVersion .isCompatible (remoteTransportVersion )) {
224+ if (remoteTransportVersion .onOrAfter (localTransportVersion )) {
225+ // Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
226+ // knows how to do that.
227+ return ;
228+ }
229+ if (TransportVersion .isKnownVersionId (remoteTransportVersion .id ())) {
230+ // Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
231+ // version is known to us.
232+ return ;
233+ }
234+ }
235+
236+ final var message = Strings .format (
237+ """
238+ Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
239+ version [%s/%s] which has an incompatible wire format.""" ,
240+ releaseVersion ,
241+ remoteTransportVersion ,
242+ channel ,
243+ Build .current ().version (),
244+ localTransportVersion
245+ );
246+ logger .warn (message );
247+ throw new IllegalStateException (message );
248+
249+ }
250+
212251 TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
213252 return pendingHandshakes .remove (requestId );
214253 }
@@ -224,11 +263,13 @@ long getNumHandshakes() {
224263 private class HandshakeResponseHandler implements TransportResponseHandler <HandshakeResponse > {
225264
226265 private final long requestId ;
266+ private final TcpChannel channel ;
227267 private final ActionListener <TransportVersion > listener ;
228268 private final AtomicBoolean isDone = new AtomicBoolean (false );
229269
230- private HandshakeResponseHandler (long requestId , ActionListener <TransportVersion > listener ) {
270+ private HandshakeResponseHandler (long requestId , TcpChannel channel , ActionListener <TransportVersion > listener ) {
231271 this .requestId = requestId ;
272+ this .channel = channel ;
232273 this .listener = listener ;
233274 }
234275
@@ -245,20 +286,13 @@ public Executor executor() {
245286 @ Override
246287 public void handleResponse (HandshakeResponse response ) {
247288 if (isDone .compareAndSet (false , true )) {
248- TransportVersion responseVersion = response .transportVersion ;
249- if (TransportVersion .isCompatible (responseVersion ) == false ) {
250- listener .onFailure (
251- new IllegalStateException (
252- "Received message from unsupported version: ["
253- + responseVersion
254- + "] minimal compatible version is: ["
255- + TransportVersions .MINIMUM_COMPATIBLE
256- + "]"
257- )
258- );
259- } else {
260- listener .onResponse (TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ()));
261- }
289+ ActionListener .completeWith (listener , () -> {
290+ ensureCompatibleVersion (version , response .getTransportVersion (), response .getReleaseVersion (), channel );
291+ final var resultVersion = TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ());
292+ assert TransportVersion .current ().before (version ) // simulating a newer-version transport service for test purposes
293+ || TransportVersion .isKnownVersionId (resultVersion .id ()) : "negotiated unknown version " + resultVersion ;
294+ return resultVersion ;
295+ });
262296 }
263297 }
264298
0 commit comments