Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,23 @@ public boolean isKnown() {
return before(TransportVersions.V_8_9_X) || TransportVersions.VERSION_IDS.containsKey(id);
}

/**
* @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if
* there are no such versions.
*/
public TransportVersion bestKnownVersion() {
if (isKnown()) {
return this;
}
TransportVersion bestSoFar = TransportVersions.ZERO;
for (final var knownVersion : TransportVersions.VERSION_IDS.values()) {
if (knownVersion.after(bestSoFar) && knownVersion.before(this)) {
bestSoFar = knownVersion;
}
}
return bestSoFar;
}

public static TransportVersion fromString(String str) {
return TransportVersion.fromId(Integer.parseInt(str));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -246,26 +247,50 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
assert ignoreDeserializationErrors : exception;
throw exception;
}
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
channel.sendResponse(
new HandshakeResponse(
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel),
Build.current().version()
)
);
}

static void ensureCompatibleVersion(
private static TransportVersion ensureCompatibleVersion(
TransportVersion localTransportVersion,
TransportVersion remoteTransportVersion,
String releaseVersion,
Object channel
) {
if (TransportVersion.isCompatible(remoteTransportVersion)) {
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
// knows how to do that.
return;
// Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current
// transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is
// initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or
// an earlier one.
return localTransportVersion;
}
if (remoteTransportVersion.isKnown()) {
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
// version is known to us.
return;
final var bestKnownVersion = remoteTransportVersion.bestKnownVersion();
if (bestKnownVersion.equals(TransportVersions.ZERO) == false) {
if (bestKnownVersion.equals(remoteTransportVersion) == false) {
// Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact
// transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade
// that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the
// best known version anyway since both ends will know it.
logger.warn(
"""
Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \
from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
Upgrading to a chronologically-older release may not work reliably and is not recommended. \
Falling back to transport protocol version [{}].""",
releaseVersion,
remoteTransportVersion,
channel,
Build.current().version(),
localTransportVersion,
bestKnownVersion
);
} // else remote is semantically older and we _do_ know its version, so we just use that without further fuss.
return bestKnownVersion;
}
}

Expand Down Expand Up @@ -323,8 +348,12 @@ public Executor executor() {
public void handleResponse(HandshakeResponse response) {
if (isDone.compareAndSet(false, true)) {
ActionListener.completeWith(listener, () -> {
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
final var resultVersion = ensureCompatibleVersion(
version,
response.getTransportVersion(),
response.getReleaseVersion(),
channel
);
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
return resultVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
public void testV7Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
final var requestId = randomNonNegativeLong();
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
try (var outputStream = new BytesStreamOutput()) {
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
outputStream.writeLong(requestId);
Expand All @@ -50,7 +51,6 @@ public void testV7Handshake() throws Exception {
outputStream.writeString("internal:tcp/handshake");
outputStream.writeByte((byte) 0); // no parent task ID;

final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
outputStream.writeByte((byte) 4); // payload length
outputStream.writeVInt(requestNodeTransportVersionId);
Expand Down Expand Up @@ -80,14 +80,15 @@ public void testV7Handshake() throws Exception {
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
assertEquals(-1, inputStream.read());
}
}

public void testV8Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
final var requestId = randomNonNegativeLong();
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
try (var outputStream = new BytesStreamOutput()) {
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
outputStream.writeLong(requestId);
Expand All @@ -100,7 +101,6 @@ public void testV8Handshake() throws Exception {
outputStream.writeString("internal:tcp/handshake");
outputStream.writeByte((byte) 0); // no parent task ID;

final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
outputStream.writeByte((byte) 4); // payload length
outputStream.writeVInt(requestNodeTransportVersionId);
Expand Down Expand Up @@ -131,14 +131,15 @@ public void testV8Handshake() throws Exception {
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
assertEquals(-1, inputStream.read());
}
}

public void testV9Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
final var requestId = randomNonNegativeLong();
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
try (var outputStream = new BytesStreamOutput()) {
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
outputStream.writeLong(requestId);
Expand All @@ -150,7 +151,6 @@ public void testV9Handshake() throws Exception {
outputStream.writeString("internal:tcp/handshake");
outputStream.writeByte((byte) 0); // no parent task ID;

final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testV9Handshake() throws Exception {
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
assertEquals(Build.current().version(), inputStream.readString());
assertEquals(-1, inputStream.read());
}
Expand Down
Loading