From de17455bbf69d7750720a56eb54a4b51da7efe4b Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 28 Feb 2025 21:38:30 +0000 Subject: [PATCH 1/3] Negotiate disordered transport handshakes Previously we'd reject transport handshakes from numerically-older nodes with a chronologically-newer release date. With this commit we instead negotiate to use the latest known version for future communications. Backport of #123397 to 8.18 --- .../org/elasticsearch/TransportVersion.java | 17 ++ .../transport/TransportHandshaker.java | 57 +++++-- .../TransportHandshakerRawMessageTests.java | 8 +- .../transport/TransportHandshakerTests.java | 160 +++++++++++++----- 4 files changed, 177 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersion.java b/server/src/main/java/org/elasticsearch/TransportVersion.java index e4f20b64a7a3d..c63b4e59392de 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersion.java +++ b/server/src/main/java/org/elasticsearch/TransportVersion.java @@ -107,6 +107,23 @@ public boolean isKnown() { return before(TransportVersions.V_8_9_X) || TransportVersions.VERSION_IDS.containsKey(id); } + /** + * @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if + * there are no such versions. + */ + public TransportVersion bestKnownVersion() { + if (TransportVersions.VERSION_IDS.containsKey(id)) { + return this; + } + TransportVersion bestSoFar = TransportVersions.ZERO; + for (final var knownVersion : TransportVersions.VERSION_IDS.values()) { + if (knownVersion.after(bestSoFar) && knownVersion.before(this)) { + bestSoFar = knownVersion; + } + } + return bestSoFar; + } + public static TransportVersion fromString(String str) { return TransportVersion.fromId(Integer.parseInt(str)); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index ae4ff96a8cc46..baf40c1ba03fb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -252,17 +252,21 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea assert ignoreDeserializationErrors : exception; throw exception; } - ensureCompatibleVersion( - version, - handshakeRequest.transportVersion, - handshakeRequest.releaseVersion, - channel, - threadPool.getThreadContext() + channel.sendResponse( + new HandshakeResponse( + ensureCompatibleVersion( + version, + handshakeRequest.transportVersion, + handshakeRequest.releaseVersion, + channel, + threadPool.getThreadContext() + ), + Build.current().version() + ) ); - channel.sendResponse(new HandshakeResponse(this.version, Build.current().version())); } - static void ensureCompatibleVersion( + private static TransportVersion ensureCompatibleVersion( TransportVersion localTransportVersion, TransportVersion remoteTransportVersion, String remoteReleaseVersion, @@ -281,14 +285,34 @@ static void ensureCompatibleVersion( } } if (remoteTransportVersion.onOrAfter(localTransportVersion)) { - // Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it - // knows how to do that. - return; + // Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current + // transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is + // initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or + // an earlier one. + return localTransportVersion; } - if (remoteTransportVersion.isKnown()) { - // Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol - // version is known to us. - return; + final var bestKnownVersion = remoteTransportVersion.bestKnownVersion(); + if (bestKnownVersion.equals(TransportVersions.ZERO) == false) { + if (bestKnownVersion.equals(remoteTransportVersion) == false) { + // Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact + // transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade + // that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the + // best known version anyway since both ends will know it. + logger.warn( + """ + Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \ + from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \ + Upgrading to a chronologically-older release may not work reliably and is not recommended. \ + Falling back to transport protocol version [{}].""", + remoteReleaseVersion, + remoteTransportVersion, + channel, + Build.current().version(), + localTransportVersion, + bestKnownVersion + ); + } // else remote is semantically older and we _do_ know its version, so we just use that without further fuss. + return bestKnownVersion; } } @@ -364,14 +388,13 @@ public Executor executor() { public void handleResponse(HandshakeResponse response) { if (isDone.compareAndSet(false, true)) { ActionListener.completeWith(listener, () -> { - ensureCompatibleVersion( + final var resultVersion = ensureCompatibleVersion( version, response.getTransportVersion(), response.getReleaseVersion(), channel, threadPool.getThreadContext() ); - final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()); assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes || resultVersion.isKnown() : "negotiated unknown version " + resultVersion; return resultVersion; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java index b4c791c7a04b3..7138a834fe959 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java @@ -88,6 +88,7 @@ public void testV7Handshake() throws Exception { public void testV8Handshake() throws Exception { final BytesRef handshakeRequestBytes; final var requestId = randomNonNegativeLong(); + final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); try (var outputStream = new BytesStreamOutput()) { outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION); outputStream.writeLong(requestId); @@ -100,7 +101,6 @@ public void testV8Handshake() throws Exception { outputStream.writeString("internal:tcp/handshake"); outputStream.writeByte((byte) 0); // no parent task ID; - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt outputStream.writeByte((byte) 4); // payload length outputStream.writeVInt(requestNodeTransportVersionId); @@ -131,7 +131,7 @@ public void testV8Handshake() throws Exception { assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(requestNodeTransportVersionId, inputStream.readVInt()); assertEquals(-1, inputStream.read()); } } @@ -139,6 +139,7 @@ public void testV8Handshake() throws Exception { public void testV9Handshake() throws Exception { final BytesRef handshakeRequestBytes; final var requestId = randomNonNegativeLong(); + final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); try (var outputStream = new BytesStreamOutput()) { outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION); outputStream.writeLong(requestId); @@ -150,7 +151,6 @@ public void testV9Handshake() throws Exception { outputStream.writeString("internal:tcp/handshake"); outputStream.writeByte((byte) 0); // no parent task ID; - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength); @@ -184,7 +184,7 @@ public void testV9Handshake() throws Exception { assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(requestNodeTransportVersionId, inputStream.readVInt()); assertEquals(Build.current().version(), inputStream.readString()); assertEquals(-1, inputStream.read()); } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index d01d264247e94..2516d4ecfd5e5 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -101,7 +102,7 @@ public void testHandshakeRequestAndResponse() throws IOException { } @TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN") - public void testIncompatibleHandshakeRequest() throws IOException { + public void testIncompatibleHandshakeRequest() throws Exception { var remoteVersion = getRandomIncompatibleTransportVersion(); TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(remoteVersion, randomIdentifier()); BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); @@ -109,29 +110,72 @@ public void testIncompatibleHandshakeRequest() throws IOException { handshakeRequest.writeTo(bytesStreamOutput); StreamInput input = bytesStreamOutput.bytes().streamInput(); input.setTransportVersion(HANDSHAKE_REQUEST_VERSION); - final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete"))); - MockLog.assertThatLogger( - () -> assertThat( - expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input)) - .getMessage(), - allOf( - containsString("Rejecting unreadable transport handshake"), - containsString( - "[" + handshakeRequest.transportVersion.toReleaseVersion() + "/" + handshakeRequest.transportVersion + "]" - ), - containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), - containsString("which has an incompatible wire format") + final TestTransportChannel channel; + if (handshakeRequest.transportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) { + + final PlainActionFuture responseFuture = new PlainActionFuture<>(); + channel = new TestTransportChannel(responseFuture); + + // we fall back to the best known version + MockLog.assertThatLogger(() -> { + try { + handshaker.handleHandshake(channel, randomNonNegativeLong(), input); + } catch (IOException e) { + throw new AssertionError(e); + } + }, + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + Strings.format( + """ + Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \ + a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \ + Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \ + transport protocol version [%s].""", + handshakeRequest.transportVersion.toReleaseVersion(), + handshakeRequest.transportVersion, + Build.current().version(), + TransportVersion.current(), + handshakeRequest.transportVersion.bestKnownVersion() + ) ) - ), - TransportHandshaker.class, - new MockLog.SeenEventExpectation( - "warning", - TransportHandshaker.class.getCanonicalName(), - Level.WARN, - "Rejecting unreadable transport handshake * incompatible wire format." - ) - ); + ); + + assertTrue(responseFuture.isDone()); + assertEquals( + handshakeRequest.transportVersion.bestKnownVersion(), + asInstanceOf(TransportHandshaker.HandshakeResponse.class, responseFuture.result()).getTransportVersion() + ); + + } else { + channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete"))); + + MockLog.assertThatLogger( + () -> assertThat( + expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input)) + .getMessage(), + allOf( + containsString("Rejecting unreadable transport handshake"), + containsString( + "[" + handshakeRequest.transportVersion.toReleaseVersion() + "/" + handshakeRequest.transportVersion + "]" + ), + containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), + containsString("which has an incompatible wire format") + ) + ), + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + "Rejecting unreadable transport handshake * incompatible wire format." + ) + ); + } assertDeprecationMessageIsLogged(remoteVersion, remoteVersion.toReleaseVersion(), channel); } @@ -154,7 +198,7 @@ public void testHandshakeResponseFromOlderNode() throws Exception { } @TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN") - public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() { + public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() throws Exception { final PlainActionFuture versionFuture = new PlainActionFuture<>(); final long reqId = randomNonNegativeLong(); handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture); @@ -166,33 +210,61 @@ public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() { var releaseVersion = randomIdentifier(); final var handshakeResponse = new TransportHandshaker.HandshakeResponse(remoteVersion, releaseVersion); - MockLog.assertThatLogger( - () -> handler.handleResponse(handshakeResponse), - TransportHandshaker.class, - new MockLog.SeenEventExpectation( - "warning", - TransportHandshaker.class.getCanonicalName(), - Level.WARN, - "Rejecting unreadable transport handshake * incompatible wire format." - ) - ); + if (remoteVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) { + // we fall back to the best known version + MockLog.assertThatLogger( + () -> handler.handleResponse(handshakeResponse), + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + Strings.format( + """ + Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \ + a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \ + Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \ + transport protocol version [%s].""", + handshakeResponse.getReleaseVersion(), + handshakeResponse.getTransportVersion(), + Build.current().version(), + TransportVersion.current(), + remoteVersion.bestKnownVersion() + ) + ) + ); - assertTrue(versionFuture.isDone()); - assertThat( - expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(), - allOf( - containsString("Rejecting unreadable transport handshake"), - containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"), - containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), - containsString("which has an incompatible wire format") - ) - ); + assertTrue(versionFuture.isDone()); + assertEquals(remoteVersion.bestKnownVersion(), versionFuture.result()); + } else { + MockLog.assertThatLogger( + () -> handler.handleResponse(handshakeResponse), + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + "Rejecting unreadable transport handshake * incompatible wire format." + ) + ); + + assertTrue(versionFuture.isDone()); + assertThat( + expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(), + allOf( + containsString("Rejecting unreadable transport handshake"), + containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"), + containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), + containsString("which has an incompatible wire format") + ) + ); + } assertDeprecationMessageIsLogged(remoteVersion, releaseVersion, channel); } private void assertDeprecationMessageIsLogged(TransportVersion remoteVersion, String remoteReleaseVersion, Object channel) { if (remoteVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE) && remoteVersion.before(V8_18_FIRST_VERSION)) { - assertWarnings(getDeprecationMessage(TransportVersion.current(), remoteVersion, remoteReleaseVersion, channel)); + assertCriticalWarnings(getDeprecationMessage(TransportVersion.current(), remoteVersion, remoteReleaseVersion, channel)); } } From 747a9bba5b23e4af8fba785cb713c6d5a8536568 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 1 Mar 2025 08:41:32 +0000 Subject: [PATCH 2/3] Fix test --- .../transport/TransportHandshakerRawMessageTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java index 7138a834fe959..6662aadd51fbb 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java @@ -39,6 +39,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase { public void testV7Handshake() throws Exception { final BytesRef handshakeRequestBytes; final var requestId = randomNonNegativeLong(); + final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); try (var outputStream = new BytesStreamOutput()) { outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); outputStream.writeLong(requestId); @@ -50,7 +51,6 @@ public void testV7Handshake() throws Exception { outputStream.writeString("internal:tcp/handshake"); outputStream.writeByte((byte) 0); // no parent task ID; - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt outputStream.writeByte((byte) 4); // payload length outputStream.writeVInt(requestNodeTransportVersionId); @@ -80,7 +80,7 @@ public void testV7Handshake() throws Exception { assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(requestNodeTransportVersionId, inputStream.readVInt()); assertEquals(-1, inputStream.read()); } } From 347f508d227a59f4b7ccf132a4aac52b2d5beb8b Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 1 Mar 2025 13:24:43 +0000 Subject: [PATCH 3/3] Fix for pre-8.9 versions --- server/src/main/java/org/elasticsearch/TransportVersion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersion.java b/server/src/main/java/org/elasticsearch/TransportVersion.java index c63b4e59392de..7437985e49636 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersion.java +++ b/server/src/main/java/org/elasticsearch/TransportVersion.java @@ -112,7 +112,7 @@ public boolean isKnown() { * there are no such versions. */ public TransportVersion bestKnownVersion() { - if (TransportVersions.VERSION_IDS.containsKey(id)) { + if (isKnown()) { return this; } TransportVersion bestSoFar = TransportVersions.ZERO;