Skip to content
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ public boolean isKnown() {
return VersionsHolder.ALL_VERSIONS_MAP.containsKey(id);
}

/**
* @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if
* there are no such versions.
*/
public TransportVersion bestKnownVersion() {
if (VersionsHolder.ALL_VERSIONS_MAP.containsKey(id)) {
return this;
}
TransportVersion bestSoFar = TransportVersions.ZERO;
for (final var knownVersion : VersionsHolder.ALL_VERSIONS_MAP.values()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point we had the map as a NavigableMap, which would allow quickly finding the closest known version. Is the linear performance here "ok" because handshakes are only done on initial connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep we hardly ever open new connections.

if (knownVersion.after(bestSoFar) && knownVersion.before(this)) {
bestSoFar = knownVersion;
}
}
return bestSoFar;
}

public static TransportVersion fromString(String str) {
return TransportVersion.fromId(Integer.parseInt(str));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -210,11 +211,15 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
assert ignoreDeserializationErrors : exception;
throw exception;
}
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
channel.sendResponse(
new HandshakeResponse(
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel),
Build.current().version()
)
);
}

static void ensureCompatibleVersion(
private static TransportVersion ensureCompatibleVersion(
TransportVersion localTransportVersion,
TransportVersion remoteTransportVersion,
String releaseVersion,
Expand All @@ -224,12 +229,30 @@ static void ensureCompatibleVersion(
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
// knows how to do that.
return;
return localTransportVersion;
}
if (remoteTransportVersion.isKnown()) {
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
// version is known to us.
return;
final var bestKnownVersion = remoteTransportVersion.bestKnownVersion();
if (bestKnownVersion.equals(TransportVersions.ZERO) == false) {
if (bestKnownVersion.equals(remoteTransportVersion) == false) {
// Remote is older than us, and we do not know its exact transport protocol version, so we choose a still-older version
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: older is ambiguous, this should be numerically-older right? Also, maybe use semantically-older since we generally talk about these as semantic versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ updated in 184ada9

// which we _do_ know (at least syntactically). However, this means that the remote is running a chronologically-newer
// release than us, even though it's a numerically-older version. We recommend not doing this, it implies an upgrade
// that goes backwards in time and therefore may regress in some way:
logger.warn(
"""
Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \
from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
Upgrading to a chronologically-older release may not work reliably and is not recommended. \
Falling back to transport protocol version [{}].""",
releaseVersion,
remoteTransportVersion,
channel,
Build.current().version(),
localTransportVersion,
bestKnownVersion
);
} // else remote is older and we _do_ know its version, so we just use that without further fuss.
return bestKnownVersion;
}
}

Expand Down Expand Up @@ -287,8 +310,12 @@ public Executor executor() {
public void handleResponse(HandshakeResponse response) {
if (isDone.compareAndSet(false, true)) {
ActionListener.completeWith(listener, () -> {
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
final var resultVersion = ensureCompatibleVersion(
version,
response.getTransportVersion(),
response.getReleaseVersion(),
channel
);
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
return resultVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
public void testV8Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
final var requestId = randomNonNegativeLong();
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
try (var outputStream = new BytesStreamOutput()) {
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
outputStream.writeLong(requestId);
Expand All @@ -53,7 +54,6 @@ public void testV8Handshake() throws Exception {
outputStream.writeString("internal:tcp/handshake");
outputStream.writeByte((byte) 0); // no parent task ID;

final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
outputStream.writeByte((byte) 4); // payload length
outputStream.writeVInt(requestNodeTransportVersionId);
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testV8Handshake() throws Exception {
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
assertEquals(-1, inputStream.read());
}
}
Expand All @@ -93,6 +93,7 @@ public void testV8Handshake() throws Exception {
public void testV9Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
final var requestId = randomNonNegativeLong();
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
try (var outputStream = new BytesStreamOutput()) {
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
outputStream.writeLong(requestId);
Expand All @@ -104,7 +105,6 @@ public void testV9Handshake() throws Exception {
outputStream.writeString("internal:tcp/handshake");
outputStream.writeByte((byte) 0); // no parent task ID;

final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testV9Handshake() throws Exception {
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
assertEquals(Build.current().version(), inputStream.readString());
assertEquals(-1, inputStream.read());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testHandshakeRequestAndResponse() throws IOException {
}

@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
public void testIncompatibleHandshakeRequest() throws IOException {
public void testIncompatibleHandshakeRequest() throws Exception {
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
getRandomIncompatibleTransportVersion(),
randomIdentifier()
Expand All @@ -112,27 +113,69 @@ public void testIncompatibleHandshakeRequest() throws IOException {
handshakeRequest.writeTo(bytesStreamOutput);
StreamInput input = bytesStreamOutput.bytes().streamInput();
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));

MockLog.assertThatLogger(
() -> assertThat(
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
.getMessage(),
allOf(
containsString("Rejecting unreadable transport handshake"),
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
containsString("which has an incompatible wire format")
if (handshakeRequest.transportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) {

final PlainActionFuture<TransportResponse> responseFuture = new PlainActionFuture<>();
final TestTransportChannel channel = new TestTransportChannel(responseFuture);

// we fall back to the best known version
MockLog.assertThatLogger(() -> {
try {
handshaker.handleHandshake(channel, randomNonNegativeLong(), input);
} catch (IOException e) {
throw new AssertionError(e);
}
},
TransportHandshaker.class,
new MockLog.SeenEventExpectation(
"warning",
TransportHandshaker.class.getCanonicalName(),
Level.WARN,
Strings.format(
"""
Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
transport protocol version [%s].""",
handshakeRequest.releaseVersion,
handshakeRequest.transportVersion,
Build.current().version(),
TransportVersion.current(),
handshakeRequest.transportVersion.bestKnownVersion()
)
)
),
TransportHandshaker.class,
new MockLog.SeenEventExpectation(
"warning",
TransportHandshaker.class.getCanonicalName(),
Level.WARN,
"Rejecting unreadable transport handshake * incompatible wire format."
)
);
);

assertTrue(responseFuture.isDone());
assertEquals(
handshakeRequest.transportVersion.bestKnownVersion(),
asInstanceOf(TransportHandshaker.HandshakeResponse.class, responseFuture.result()).getTransportVersion()
);

} else {
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));

MockLog.assertThatLogger(
() -> assertThat(
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
.getMessage(),
allOf(
containsString("Rejecting unreadable transport handshake"),
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
containsString("which has an incompatible wire format")
)
),
TransportHandshaker.class,
new MockLog.SeenEventExpectation(
"warning",
TransportHandshaker.class.getCanonicalName(),
Level.WARN,
"Rejecting unreadable transport handshake * incompatible wire format."
)
);
}
}

public void testHandshakeResponseFromOlderNode() throws Exception {
Expand All @@ -151,40 +194,66 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
}

@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() throws Exception {
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
final long reqId = randomNonNegativeLong();
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);

assertFalse(versionFuture.isDone());

final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
getRandomIncompatibleTransportVersion(),
randomIdentifier()
);
final var randomIncompatibleTransportVersion = getRandomIncompatibleTransportVersion();
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(randomIncompatibleTransportVersion, randomIdentifier());

if (randomIncompatibleTransportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) {
// we fall back to the best known version
MockLog.assertThatLogger(
() -> handler.handleResponse(handshakeResponse),
TransportHandshaker.class,
new MockLog.SeenEventExpectation(
"warning",
TransportHandshaker.class.getCanonicalName(),
Level.WARN,
Strings.format(
"""
Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
transport protocol version [%s].""",
handshakeResponse.getReleaseVersion(),
handshakeResponse.getTransportVersion(),
Build.current().version(),
TransportVersion.current(),
randomIncompatibleTransportVersion.bestKnownVersion()
)
)
);

MockLog.assertThatLogger(
() -> handler.handleResponse(handshakeResponse),
TransportHandshaker.class,
new MockLog.SeenEventExpectation(
"warning",
TransportHandshaker.class.getCanonicalName(),
Level.WARN,
"Rejecting unreadable transport handshake * incompatible wire format."
)
);
assertTrue(versionFuture.isDone());
assertEquals(randomIncompatibleTransportVersion.bestKnownVersion(), versionFuture.result());
} else {
MockLog.assertThatLogger(
() -> handler.handleResponse(handshakeResponse),
TransportHandshaker.class,
new MockLog.SeenEventExpectation(
"warning",
TransportHandshaker.class.getCanonicalName(),
Level.WARN,
"Rejecting unreadable transport handshake * incompatible wire format."
)
);

assertTrue(versionFuture.isDone());
assertThat(
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
allOf(
containsString("Rejecting unreadable transport handshake"),
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
containsString("which has an incompatible wire format")
)
);
assertTrue(versionFuture.isDone());
assertThat(
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
allOf(
containsString("Rejecting unreadable transport handshake"),
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
containsString("which has an incompatible wire format")
)
);
}
}

private static TransportVersion getRandomIncompatibleTransportVersion() {
Expand Down