1111
1212import org .elasticsearch .Build ;
1313import org .elasticsearch .TransportVersion ;
14+ import org .elasticsearch .TransportVersions ;
1415import org .elasticsearch .action .ActionListener ;
1516import org .elasticsearch .cluster .node .DiscoveryNode ;
1617import org .elasticsearch .common .bytes .BytesReference ;
1718import org .elasticsearch .common .io .stream .BytesStreamOutput ;
1819import org .elasticsearch .common .io .stream .StreamInput ;
1920import org .elasticsearch .common .io .stream .StreamOutput ;
21+ import org .elasticsearch .common .logging .DeprecationCategory ;
22+ import org .elasticsearch .common .logging .DeprecationLogger ;
2023import org .elasticsearch .common .metrics .CounterMetric ;
24+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
2125import org .elasticsearch .core .Strings ;
2226import org .elasticsearch .core .TimeValue ;
2327import org .elasticsearch .logging .LogManager ;
@@ -160,6 +164,7 @@ final class TransportHandshaker {
160164 */
161165
162166 private static final Logger logger = LogManager .getLogger (TransportHandshaker .class );
167+ private static final DeprecationLogger deprecationLogger = DeprecationLogger .getLogger (logger .getName ());
163168
164169 static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion .fromId (6_08_00_99 );
165170 static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion .fromId (7_17_00_99 );
@@ -171,6 +176,7 @@ final class TransportHandshaker {
171176 );
172177
173178 static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake" ;
179+ static final TransportVersion V8_18_FIRST_VERSION = TransportVersions .INDEXING_PRESSURE_THROTTLING_STATS ;
174180 private final ConcurrentMap <Long , HandshakeResponseHandler > pendingHandshakes = new ConcurrentHashMap <>();
175181 private final CounterMetric numHandshakes = new CounterMetric ();
176182
@@ -246,17 +252,34 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
246252 assert ignoreDeserializationErrors : exception ;
247253 throw exception ;
248254 }
249- ensureCompatibleVersion (version , handshakeRequest .transportVersion , handshakeRequest .releaseVersion , channel );
255+ ensureCompatibleVersion (
256+ version ,
257+ handshakeRequest .transportVersion ,
258+ handshakeRequest .releaseVersion ,
259+ channel ,
260+ threadPool .getThreadContext ()
261+ );
250262 channel .sendResponse (new HandshakeResponse (this .version , Build .current ().version ()));
251263 }
252264
253265 static void ensureCompatibleVersion (
254266 TransportVersion localTransportVersion ,
255267 TransportVersion remoteTransportVersion ,
256- String releaseVersion ,
257- Object channel
268+ String remoteReleaseVersion ,
269+ Object channel ,
270+ ThreadContext threadContext
258271 ) {
259272 if (TransportVersion .isCompatible (remoteTransportVersion )) {
273+ // Prevent log message headers from being added to the handshake response.
274+ try (var ignored = threadContext .stashContext ()) {
275+ if (remoteTransportVersion .before (V8_18_FIRST_VERSION )) {
276+ deprecationLogger .warn (
277+ DeprecationCategory .OTHER ,
278+ "handshake_version" ,
279+ getDeprecationMessage (localTransportVersion , remoteTransportVersion , remoteReleaseVersion , channel )
280+ );
281+ }
282+ }
260283 if (remoteTransportVersion .onOrAfter (localTransportVersion )) {
261284 // Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
262285 // knows how to do that.
@@ -273,7 +296,7 @@ static void ensureCompatibleVersion(
273296 """
274297 Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
275298 version [%s/%s] which has an incompatible wire format.""" ,
276- releaseVersion ,
299+ remoteReleaseVersion ,
277300 remoteTransportVersion ,
278301 channel ,
279302 Build .current ().version (),
@@ -284,6 +307,24 @@ static void ensureCompatibleVersion(
284307
285308 }
286309
310+ // Non-private for testing
311+ static String getDeprecationMessage (
312+ TransportVersion localTransportVersion ,
313+ TransportVersion remoteTransportVersion ,
314+ String remoteReleaseVersion ,
315+ Object channel
316+ ) {
317+ return Strings .format (
318+ "Performed a handshake with a remote node with version [%s/%s] received on [%s] which "
319+ + "will be incompatible after this node on version [%s/%s] is upgraded to 9.x." ,
320+ remoteReleaseVersion ,
321+ remoteTransportVersion ,
322+ channel ,
323+ Build .current ().version (),
324+ localTransportVersion
325+ );
326+ }
327+
287328 TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
288329 return pendingHandshakes .remove (requestId );
289330 }
@@ -323,7 +364,13 @@ public Executor executor() {
323364 public void handleResponse (HandshakeResponse response ) {
324365 if (isDone .compareAndSet (false , true )) {
325366 ActionListener .completeWith (listener , () -> {
326- ensureCompatibleVersion (version , response .getTransportVersion (), response .getReleaseVersion (), channel );
367+ ensureCompatibleVersion (
368+ version ,
369+ response .getTransportVersion (),
370+ response .getReleaseVersion (),
371+ channel ,
372+ threadPool .getThreadContext ()
373+ );
327374 final var resultVersion = TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ());
328375 assert TransportVersion .current ().before (version ) // simulating a newer-version transport service for test purposes
329376 || resultVersion .isKnown () : "negotiated unknown version " + resultVersion ;
0 commit comments