1818import org .elasticsearch .common .io .stream .BytesStreamOutput ;
1919import org .elasticsearch .common .io .stream .StreamInput ;
2020import org .elasticsearch .common .io .stream .StreamOutput ;
21+ import org .elasticsearch .common .logging .DeprecationCategory ;
22+ import org .elasticsearch .common .logging .DeprecationLogger ;
2123import org .elasticsearch .common .metrics .CounterMetric ;
24+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
2225import org .elasticsearch .core .Strings ;
2326import org .elasticsearch .core .TimeValue ;
2427import org .elasticsearch .logging .LogManager ;
@@ -161,6 +164,7 @@ final class TransportHandshaker {
161164 */
162165
163166 private static final Logger logger = LogManager .getLogger (TransportHandshaker .class );
167+ private static final DeprecationLogger deprecationLogger = DeprecationLogger .getLogger (logger .getName ());
164168
165169 static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion .fromId (6_08_00_99 );
166170 static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion .fromId (7_17_00_99 );
@@ -172,6 +176,7 @@ final class TransportHandshaker {
172176 );
173177
174178 static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake" ;
179+ static final TransportVersion V8_19_FIRST_VERSION = TransportVersions .INITIAL_ELASTICSEARCH_8_19 ;
175180 private final ConcurrentMap <Long , HandshakeResponseHandler > pendingHandshakes = new ConcurrentHashMap <>();
176181 private final CounterMetric numHandshakes = new CounterMetric ();
177182
@@ -249,7 +254,13 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
249254 }
250255 channel .sendResponse (
251256 new HandshakeResponse (
252- ensureCompatibleVersion (version , handshakeRequest .transportVersion , handshakeRequest .releaseVersion , channel ),
257+ ensureCompatibleVersion (
258+ version ,
259+ handshakeRequest .transportVersion ,
260+ handshakeRequest .releaseVersion ,
261+ channel ,
262+ threadPool .getThreadContext ()
263+ ),
253264 Build .current ().version ()
254265 )
255266 );
@@ -258,10 +269,21 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
258269 private static TransportVersion ensureCompatibleVersion (
259270 TransportVersion localTransportVersion ,
260271 TransportVersion remoteTransportVersion ,
261- String releaseVersion ,
262- Object channel
272+ String remoteReleaseVersion ,
273+ Object channel ,
274+ ThreadContext threadContext
263275 ) {
264276 if (TransportVersion .isCompatible (remoteTransportVersion )) {
277+ // Prevent log message headers from being added to the handshake response.
278+ try (var ignored = threadContext .stashContext ()) {
279+ if (remoteTransportVersion .before (V8_19_FIRST_VERSION )) {
280+ deprecationLogger .warn (
281+ DeprecationCategory .OTHER ,
282+ "handshake_version" ,
283+ getDeprecationMessage (localTransportVersion , remoteTransportVersion , remoteReleaseVersion , channel )
284+ );
285+ }
286+ }
265287 if (remoteTransportVersion .onOrAfter (localTransportVersion )) {
266288 // Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current
267289 // transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is
@@ -282,7 +304,7 @@ private static TransportVersion ensureCompatibleVersion(
282304 from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
283305 Upgrading to a chronologically-older release may not work reliably and is not recommended. \
284306 Falling back to transport protocol version [{}].""" ,
285- releaseVersion ,
307+ remoteReleaseVersion ,
286308 remoteTransportVersion ,
287309 channel ,
288310 Build .current ().version (),
@@ -298,7 +320,7 @@ private static TransportVersion ensureCompatibleVersion(
298320 """
299321 Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
300322 version [%s/%s] which has an incompatible wire format.""" ,
301- releaseVersion ,
323+ remoteReleaseVersion ,
302324 remoteTransportVersion ,
303325 channel ,
304326 Build .current ().version (),
@@ -309,6 +331,24 @@ private static TransportVersion ensureCompatibleVersion(
309331
310332 }
311333
334+ // Non-private for testing
335+ static String getDeprecationMessage (
336+ TransportVersion localTransportVersion ,
337+ TransportVersion remoteTransportVersion ,
338+ String remoteReleaseVersion ,
339+ Object channel
340+ ) {
341+ return Strings .format (
342+ "Performed a handshake with a remote node with version [%s/%s] received on [%s] which "
343+ + "will be incompatible after this node on version [%s/%s] is upgraded to >= 9.1." ,
344+ remoteReleaseVersion ,
345+ remoteTransportVersion ,
346+ channel ,
347+ Build .current ().version (),
348+ localTransportVersion
349+ );
350+ }
351+
312352 TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
313353 return pendingHandshakes .remove (requestId );
314354 }
@@ -352,7 +392,8 @@ public void handleResponse(HandshakeResponse response) {
352392 version ,
353393 response .getTransportVersion (),
354394 response .getReleaseVersion (),
355- channel
395+ channel ,
396+ threadPool .getThreadContext ()
356397 );
357398 assert TransportVersion .current ().before (version ) // simulating a newer-version transport service for test purposes
358399 || resultVersion .isKnown () : "negotiated unknown version " + resultVersion ;
0 commit comments