1919import org .elasticsearch .common .io .stream .BytesStreamOutput ;
2020import org .elasticsearch .common .io .stream .StreamInput ;
2121import org .elasticsearch .core .FixForMultiProject ;
22+ import org .elasticsearch .core .Strings ;
2223import org .elasticsearch .core .TimeValue ;
2324import org .elasticsearch .core .UpdateForV10 ;
2425import org .elasticsearch .tasks .TaskId ;
@@ -102,7 +103,7 @@ public void testHandshakeRequestAndResponse() throws IOException {
102103 }
103104
104105 @ TestLogging (reason = "testing WARN logging" , value = "org.elasticsearch.transport.TransportHandshaker:WARN" )
105- public void testIncompatibleHandshakeRequest () throws IOException {
106+ public void testIncompatibleHandshakeRequest () throws Exception {
106107 TransportHandshaker .HandshakeRequest handshakeRequest = new TransportHandshaker .HandshakeRequest (
107108 getRandomIncompatibleTransportVersion (),
108109 randomIdentifier ()
@@ -112,27 +113,69 @@ public void testIncompatibleHandshakeRequest() throws IOException {
112113 handshakeRequest .writeTo (bytesStreamOutput );
113114 StreamInput input = bytesStreamOutput .bytes ().streamInput ();
114115 input .setTransportVersion (HANDSHAKE_REQUEST_VERSION );
115- final TestTransportChannel channel = new TestTransportChannel (ActionListener .running (() -> fail ("should not complete" )));
116116
117- MockLog .assertThatLogger (
118- () -> assertThat (
119- expectThrows (IllegalStateException .class , () -> handshaker .handleHandshake (channel , randomNonNegativeLong (), input ))
120- .getMessage (),
121- allOf (
122- containsString ("Rejecting unreadable transport handshake" ),
123- containsString ("[" + handshakeRequest .releaseVersion + "/" + handshakeRequest .transportVersion + "]" ),
124- containsString ("[" + Build .current ().version () + "/" + TransportVersion .current () + "]" ),
125- containsString ("which has an incompatible wire format" )
117+ if (handshakeRequest .transportVersion .onOrAfter (TransportVersions .MINIMUM_COMPATIBLE )) {
118+
119+ final PlainActionFuture <TransportResponse > responseFuture = new PlainActionFuture <>();
120+ final TestTransportChannel channel = new TestTransportChannel (responseFuture );
121+
122+ // we fall back to the best known version
123+ MockLog .assertThatLogger (() -> {
124+ try {
125+ handshaker .handleHandshake (channel , randomNonNegativeLong (), input );
126+ } catch (IOException e ) {
127+ throw new AssertionError (e );
128+ }
129+ },
130+ TransportHandshaker .class ,
131+ new MockLog .SeenEventExpectation (
132+ "warning" ,
133+ TransportHandshaker .class .getCanonicalName (),
134+ Level .WARN ,
135+ Strings .format (
136+ """
137+ Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
138+ a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
139+ Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
140+ transport protocol version [%s].""" ,
141+ handshakeRequest .releaseVersion ,
142+ handshakeRequest .transportVersion ,
143+ Build .current ().version (),
144+ TransportVersion .current (),
145+ handshakeRequest .transportVersion .bestKnownVersion ()
146+ )
126147 )
127- ),
128- TransportHandshaker .class ,
129- new MockLog .SeenEventExpectation (
130- "warning" ,
131- TransportHandshaker .class .getCanonicalName (),
132- Level .WARN ,
133- "Rejecting unreadable transport handshake * incompatible wire format."
134- )
135- );
148+ );
149+
150+ assertTrue (responseFuture .isDone ());
151+ assertEquals (
152+ handshakeRequest .transportVersion .bestKnownVersion (),
153+ asInstanceOf (TransportHandshaker .HandshakeResponse .class , responseFuture .result ()).getTransportVersion ()
154+ );
155+
156+ } else {
157+ final TestTransportChannel channel = new TestTransportChannel (ActionListener .running (() -> fail ("should not complete" )));
158+
159+ MockLog .assertThatLogger (
160+ () -> assertThat (
161+ expectThrows (IllegalStateException .class , () -> handshaker .handleHandshake (channel , randomNonNegativeLong (), input ))
162+ .getMessage (),
163+ allOf (
164+ containsString ("Rejecting unreadable transport handshake" ),
165+ containsString ("[" + handshakeRequest .releaseVersion + "/" + handshakeRequest .transportVersion + "]" ),
166+ containsString ("[" + Build .current ().version () + "/" + TransportVersion .current () + "]" ),
167+ containsString ("which has an incompatible wire format" )
168+ )
169+ ),
170+ TransportHandshaker .class ,
171+ new MockLog .SeenEventExpectation (
172+ "warning" ,
173+ TransportHandshaker .class .getCanonicalName (),
174+ Level .WARN ,
175+ "Rejecting unreadable transport handshake * incompatible wire format."
176+ )
177+ );
178+ }
136179 }
137180
138181 public void testHandshakeResponseFromOlderNode () throws Exception {
@@ -151,40 +194,66 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
151194 }
152195
153196 @ TestLogging (reason = "testing WARN logging" , value = "org.elasticsearch.transport.TransportHandshaker:WARN" )
154- public void testHandshakeResponseFromOlderNodeWithPatchedProtocol () {
197+ public void testHandshakeResponseFromOlderNodeWithPatchedProtocol () throws Exception {
155198 final PlainActionFuture <TransportVersion > versionFuture = new PlainActionFuture <>();
156199 final long reqId = randomNonNegativeLong ();
157200 handshaker .sendHandshake (reqId , node , channel , SAFE_AWAIT_TIMEOUT , versionFuture );
158201 TransportResponseHandler <TransportHandshaker .HandshakeResponse > handler = handshaker .removeHandlerForHandshake (reqId );
159202
160203 assertFalse (versionFuture .isDone ());
161204
162- final var handshakeResponse = new TransportHandshaker .HandshakeResponse (
163- getRandomIncompatibleTransportVersion (),
164- randomIdentifier ()
165- );
205+ final var randomIncompatibleTransportVersion = getRandomIncompatibleTransportVersion ();
206+ final var handshakeResponse = new TransportHandshaker .HandshakeResponse (randomIncompatibleTransportVersion , randomIdentifier ());
207+
208+ if (randomIncompatibleTransportVersion .onOrAfter (TransportVersions .MINIMUM_COMPATIBLE )) {
209+ // we fall back to the best known version
210+ MockLog .assertThatLogger (
211+ () -> handler .handleResponse (handshakeResponse ),
212+ TransportHandshaker .class ,
213+ new MockLog .SeenEventExpectation (
214+ "warning" ,
215+ TransportHandshaker .class .getCanonicalName (),
216+ Level .WARN ,
217+ Strings .format (
218+ """
219+ Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
220+ a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
221+ Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
222+ transport protocol version [%s].""" ,
223+ handshakeResponse .getReleaseVersion (),
224+ handshakeResponse .getTransportVersion (),
225+ Build .current ().version (),
226+ TransportVersion .current (),
227+ randomIncompatibleTransportVersion .bestKnownVersion ()
228+ )
229+ )
230+ );
166231
167- MockLog .assertThatLogger (
168- () -> handler .handleResponse (handshakeResponse ),
169- TransportHandshaker .class ,
170- new MockLog .SeenEventExpectation (
171- "warning" ,
172- TransportHandshaker .class .getCanonicalName (),
173- Level .WARN ,
174- "Rejecting unreadable transport handshake * incompatible wire format."
175- )
176- );
232+ assertTrue (versionFuture .isDone ());
233+ assertEquals (randomIncompatibleTransportVersion .bestKnownVersion (), versionFuture .result ());
234+ } else {
235+ MockLog .assertThatLogger (
236+ () -> handler .handleResponse (handshakeResponse ),
237+ TransportHandshaker .class ,
238+ new MockLog .SeenEventExpectation (
239+ "warning" ,
240+ TransportHandshaker .class .getCanonicalName (),
241+ Level .WARN ,
242+ "Rejecting unreadable transport handshake * incompatible wire format."
243+ )
244+ );
177245
178- assertTrue (versionFuture .isDone ());
179- assertThat (
180- expectThrows (ExecutionException .class , IllegalStateException .class , versionFuture ::result ).getMessage (),
181- allOf (
182- containsString ("Rejecting unreadable transport handshake" ),
183- containsString ("[" + handshakeResponse .getReleaseVersion () + "/" + handshakeResponse .getTransportVersion () + "]" ),
184- containsString ("[" + Build .current ().version () + "/" + TransportVersion .current () + "]" ),
185- containsString ("which has an incompatible wire format" )
186- )
187- );
246+ assertTrue (versionFuture .isDone ());
247+ assertThat (
248+ expectThrows (ExecutionException .class , IllegalStateException .class , versionFuture ::result ).getMessage (),
249+ allOf (
250+ containsString ("Rejecting unreadable transport handshake" ),
251+ containsString ("[" + handshakeResponse .getReleaseVersion () + "/" + handshakeResponse .getTransportVersion () + "]" ),
252+ containsString ("[" + Build .current ().version () + "/" + TransportVersion .current () + "]" ),
253+ containsString ("which has an incompatible wire format" )
254+ )
255+ );
256+ }
188257 }
189258
190259 private static TransportVersion getRandomIncompatibleTransportVersion () {
0 commit comments