diff --git a/server/src/main/java/org/elasticsearch/TransportVersion.java b/server/src/main/java/org/elasticsearch/TransportVersion.java index e4f20b64a7a3d..7437985e49636 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersion.java +++ b/server/src/main/java/org/elasticsearch/TransportVersion.java @@ -107,6 +107,23 @@ public boolean isKnown() { return before(TransportVersions.V_8_9_X) || TransportVersions.VERSION_IDS.containsKey(id); } + /** + * @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if + * there are no such versions. + */ + public TransportVersion bestKnownVersion() { + if (isKnown()) { + return this; + } + TransportVersion bestSoFar = TransportVersions.ZERO; + for (final var knownVersion : TransportVersions.VERSION_IDS.values()) { + if (knownVersion.after(bestSoFar) && knownVersion.before(this)) { + bestSoFar = knownVersion; + } + } + return bestSoFar; + } + public static TransportVersion fromString(String str) { return TransportVersion.fromId(Integer.parseInt(str)); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index daf5a18db3dcd..ed04c6a0dfd85 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -11,6 +11,7 @@ import org.elasticsearch.Build; import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; @@ -246,11 +247,15 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea assert ignoreDeserializationErrors : exception; throw exception; } - ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel); - channel.sendResponse(new HandshakeResponse(this.version, Build.current().version())); + channel.sendResponse( + new HandshakeResponse( + ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel), + Build.current().version() + ) + ); } - static void ensureCompatibleVersion( + private static TransportVersion ensureCompatibleVersion( TransportVersion localTransportVersion, TransportVersion remoteTransportVersion, String releaseVersion, @@ -258,14 +263,34 @@ static void ensureCompatibleVersion( ) { if (TransportVersion.isCompatible(remoteTransportVersion)) { if (remoteTransportVersion.onOrAfter(localTransportVersion)) { - // Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it - // knows how to do that. - return; + // Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current + // transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is + // initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or + // an earlier one. + return localTransportVersion; } - if (remoteTransportVersion.isKnown()) { - // Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol - // version is known to us. - return; + final var bestKnownVersion = remoteTransportVersion.bestKnownVersion(); + if (bestKnownVersion.equals(TransportVersions.ZERO) == false) { + if (bestKnownVersion.equals(remoteTransportVersion) == false) { + // Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact + // transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade + // that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the + // best known version anyway since both ends will know it. + logger.warn( + """ + Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \ + from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \ + Upgrading to a chronologically-older release may not work reliably and is not recommended. \ + Falling back to transport protocol version [{}].""", + releaseVersion, + remoteTransportVersion, + channel, + Build.current().version(), + localTransportVersion, + bestKnownVersion + ); + } // else remote is semantically older and we _do_ know its version, so we just use that without further fuss. + return bestKnownVersion; } } @@ -323,8 +348,12 @@ public Executor executor() { public void handleResponse(HandshakeResponse response) { if (isDone.compareAndSet(false, true)) { ActionListener.completeWith(listener, () -> { - ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel); - final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()); + final var resultVersion = ensureCompatibleVersion( + version, + response.getTransportVersion(), + response.getReleaseVersion(), + channel + ); assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes || resultVersion.isKnown() : "negotiated unknown version " + resultVersion; return resultVersion; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java index b4c791c7a04b3..6662aadd51fbb 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java @@ -39,6 +39,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase { public void testV7Handshake() throws Exception { final BytesRef handshakeRequestBytes; final var requestId = randomNonNegativeLong(); + final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); try (var outputStream = new BytesStreamOutput()) { outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); outputStream.writeLong(requestId); @@ -50,7 +51,6 @@ public void testV7Handshake() throws Exception { outputStream.writeString("internal:tcp/handshake"); outputStream.writeByte((byte) 0); // no parent task ID; - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt outputStream.writeByte((byte) 4); // payload length outputStream.writeVInt(requestNodeTransportVersionId); @@ -80,7 +80,7 @@ public void testV7Handshake() throws Exception { assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(requestNodeTransportVersionId, inputStream.readVInt()); assertEquals(-1, inputStream.read()); } } @@ -88,6 +88,7 @@ public void testV7Handshake() throws Exception { public void testV8Handshake() throws Exception { final BytesRef handshakeRequestBytes; final var requestId = randomNonNegativeLong(); + final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); try (var outputStream = new BytesStreamOutput()) { outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION); outputStream.writeLong(requestId); @@ -100,7 +101,6 @@ public void testV8Handshake() throws Exception { outputStream.writeString("internal:tcp/handshake"); outputStream.writeByte((byte) 0); // no parent task ID; - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt outputStream.writeByte((byte) 4); // payload length outputStream.writeVInt(requestNodeTransportVersionId); @@ -131,7 +131,7 @@ public void testV8Handshake() throws Exception { assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(requestNodeTransportVersionId, inputStream.readVInt()); assertEquals(-1, inputStream.read()); } } @@ -139,6 +139,7 @@ public void testV8Handshake() throws Exception { public void testV9Handshake() throws Exception { final BytesRef handshakeRequestBytes; final var requestId = randomNonNegativeLong(); + final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); try (var outputStream = new BytesStreamOutput()) { outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION); outputStream.writeLong(requestId); @@ -150,7 +151,6 @@ public void testV9Handshake() throws Exception { outputStream.writeString("internal:tcp/handshake"); outputStream.writeByte((byte) 0); // no parent task ID; - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength); @@ -184,7 +184,7 @@ public void testV9Handshake() throws Exception { assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(requestNodeTransportVersionId, inputStream.readVInt()); assertEquals(Build.current().version(), inputStream.readString()); assertEquals(-1, inputStream.read()); } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 97536839e5e20..e56f0a772fb51 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -99,7 +100,7 @@ public void testHandshakeRequestAndResponse() throws IOException { } @TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN") - public void testIncompatibleHandshakeRequest() throws IOException { + public void testIncompatibleHandshakeRequest() throws Exception { TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest( getRandomIncompatibleTransportVersion(), randomIdentifier() @@ -109,29 +110,71 @@ public void testIncompatibleHandshakeRequest() throws IOException { handshakeRequest.writeTo(bytesStreamOutput); StreamInput input = bytesStreamOutput.bytes().streamInput(); input.setTransportVersion(HANDSHAKE_REQUEST_VERSION); - final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete"))); - MockLog.assertThatLogger( - () -> assertThat( - expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input)) - .getMessage(), - allOf( - containsString("Rejecting unreadable transport handshake"), - containsString( - "[" + handshakeRequest.transportVersion.toReleaseVersion() + "/" + handshakeRequest.transportVersion + "]" - ), - containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), - containsString("which has an incompatible wire format") + if (handshakeRequest.transportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) { + + final PlainActionFuture responseFuture = new PlainActionFuture<>(); + final TestTransportChannel channel = new TestTransportChannel(responseFuture); + + // we fall back to the best known version + MockLog.assertThatLogger(() -> { + try { + handshaker.handleHandshake(channel, randomNonNegativeLong(), input); + } catch (IOException e) { + throw new AssertionError(e); + } + }, + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + Strings.format( + """ + Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \ + a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \ + Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \ + transport protocol version [%s].""", + handshakeRequest.transportVersion.toReleaseVersion(), + handshakeRequest.transportVersion, + Build.current().version(), + TransportVersion.current(), + handshakeRequest.transportVersion.bestKnownVersion() + ) ) - ), - TransportHandshaker.class, - new MockLog.SeenEventExpectation( - "warning", - TransportHandshaker.class.getCanonicalName(), - Level.WARN, - "Rejecting unreadable transport handshake * incompatible wire format." - ) - ); + ); + + assertTrue(responseFuture.isDone()); + assertEquals( + handshakeRequest.transportVersion.bestKnownVersion(), + asInstanceOf(TransportHandshaker.HandshakeResponse.class, responseFuture.result()).getTransportVersion() + ); + + } else { + final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete"))); + + MockLog.assertThatLogger( + () -> assertThat( + expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input)) + .getMessage(), + allOf( + containsString("Rejecting unreadable transport handshake"), + containsString( + "[" + handshakeRequest.transportVersion.toReleaseVersion() + "/" + handshakeRequest.transportVersion + "]" + ), + containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), + containsString("which has an incompatible wire format") + ) + ), + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + "Rejecting unreadable transport handshake * incompatible wire format." + ) + ); + } } public void testHandshakeResponseFromOlderNode() throws Exception { @@ -150,7 +193,7 @@ public void testHandshakeResponseFromOlderNode() throws Exception { } @TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN") - public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() { + public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() throws Exception { final PlainActionFuture versionFuture = new PlainActionFuture<>(); final long reqId = randomNonNegativeLong(); handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture); @@ -158,32 +201,58 @@ public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() { assertFalse(versionFuture.isDone()); - final var handshakeResponse = new TransportHandshaker.HandshakeResponse( - getRandomIncompatibleTransportVersion(), - randomIdentifier() - ); + final var randomIncompatibleTransportVersion = getRandomIncompatibleTransportVersion(); + final var handshakeResponse = new TransportHandshaker.HandshakeResponse(randomIncompatibleTransportVersion, randomIdentifier()); + + if (randomIncompatibleTransportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) { + // we fall back to the best known version + MockLog.assertThatLogger( + () -> handler.handleResponse(handshakeResponse), + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + Strings.format( + """ + Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \ + a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \ + Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \ + transport protocol version [%s].""", + handshakeResponse.getReleaseVersion(), + handshakeResponse.getTransportVersion(), + Build.current().version(), + TransportVersion.current(), + randomIncompatibleTransportVersion.bestKnownVersion() + ) + ) + ); - MockLog.assertThatLogger( - () -> handler.handleResponse(handshakeResponse), - TransportHandshaker.class, - new MockLog.SeenEventExpectation( - "warning", - TransportHandshaker.class.getCanonicalName(), - Level.WARN, - "Rejecting unreadable transport handshake * incompatible wire format." - ) - ); + assertTrue(versionFuture.isDone()); + assertEquals(randomIncompatibleTransportVersion.bestKnownVersion(), versionFuture.result()); + } else { + MockLog.assertThatLogger( + () -> handler.handleResponse(handshakeResponse), + TransportHandshaker.class, + new MockLog.SeenEventExpectation( + "warning", + TransportHandshaker.class.getCanonicalName(), + Level.WARN, + "Rejecting unreadable transport handshake * incompatible wire format." + ) + ); - assertTrue(versionFuture.isDone()); - assertThat( - expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(), - allOf( - containsString("Rejecting unreadable transport handshake"), - containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"), - containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), - containsString("which has an incompatible wire format") - ) - ); + assertTrue(versionFuture.isDone()); + assertThat( + expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(), + allOf( + containsString("Rejecting unreadable transport handshake"), + containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"), + containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"), + containsString("which has an incompatible wire format") + ) + ); + } } private static TransportVersion getRandomIncompatibleTransportVersion() {