Skip to content

Commit 6bac0e4

Browse files
authored
Negotiate disordered transport handshakes (#123764)
* Negotiate disordered transport handshakes Previously we'd reject transport handshakes from numerically-older nodes with a chronologically-newer release date. With this commit we instead negotiate to use the latest known version for future communications. Backport of #123397 to 8.x * Negotiate disordered transport handshakes Previously we'd reject transport handshakes from numerically-older nodes with a chronologically-newer release date. With this commit we instead negotiate to use the latest known version for future communications. Backport of #123397 to 8.x * Fix test * Fix for pre-8.9 versions
1 parent 73259a2 commit 6bac0e4

File tree

4 files changed

+180
-65
lines changed

4 files changed

+180
-65
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,23 @@ public boolean isKnown() {
107107
return before(TransportVersions.V_8_9_X) || TransportVersions.VERSION_IDS.containsKey(id);
108108
}
109109

110+
/**
111+
* @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if
112+
* there are no such versions.
113+
*/
114+
public TransportVersion bestKnownVersion() {
115+
if (isKnown()) {
116+
return this;
117+
}
118+
TransportVersion bestSoFar = TransportVersions.ZERO;
119+
for (final var knownVersion : TransportVersions.VERSION_IDS.values()) {
120+
if (knownVersion.after(bestSoFar) && knownVersion.before(this)) {
121+
bestSoFar = knownVersion;
122+
}
123+
}
124+
return bestSoFar;
125+
}
126+
110127
public static TransportVersion fromString(String str) {
111128
return TransportVersion.fromId(Integer.parseInt(str));
112129
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.TransportVersion;
14+
import org.elasticsearch.TransportVersions;
1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.common.bytes.BytesReference;
@@ -246,26 +247,50 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
246247
assert ignoreDeserializationErrors : exception;
247248
throw exception;
248249
}
249-
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
250-
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
250+
channel.sendResponse(
251+
new HandshakeResponse(
252+
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel),
253+
Build.current().version()
254+
)
255+
);
251256
}
252257

253-
static void ensureCompatibleVersion(
258+
private static TransportVersion ensureCompatibleVersion(
254259
TransportVersion localTransportVersion,
255260
TransportVersion remoteTransportVersion,
256261
String releaseVersion,
257262
Object channel
258263
) {
259264
if (TransportVersion.isCompatible(remoteTransportVersion)) {
260265
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
261-
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
262-
// knows how to do that.
263-
return;
266+
// Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current
267+
// transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is
268+
// initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or
269+
// an earlier one.
270+
return localTransportVersion;
264271
}
265-
if (remoteTransportVersion.isKnown()) {
266-
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
267-
// version is known to us.
268-
return;
272+
final var bestKnownVersion = remoteTransportVersion.bestKnownVersion();
273+
if (bestKnownVersion.equals(TransportVersions.ZERO) == false) {
274+
if (bestKnownVersion.equals(remoteTransportVersion) == false) {
275+
// Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact
276+
// transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade
277+
// that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the
278+
// best known version anyway since both ends will know it.
279+
logger.warn(
280+
"""
281+
Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \
282+
from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
283+
Upgrading to a chronologically-older release may not work reliably and is not recommended. \
284+
Falling back to transport protocol version [{}].""",
285+
releaseVersion,
286+
remoteTransportVersion,
287+
channel,
288+
Build.current().version(),
289+
localTransportVersion,
290+
bestKnownVersion
291+
);
292+
} // else remote is semantically older and we _do_ know its version, so we just use that without further fuss.
293+
return bestKnownVersion;
269294
}
270295
}
271296

@@ -323,8 +348,12 @@ public Executor executor() {
323348
public void handleResponse(HandshakeResponse response) {
324349
if (isDone.compareAndSet(false, true)) {
325350
ActionListener.completeWith(listener, () -> {
326-
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
327-
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
351+
final var resultVersion = ensureCompatibleVersion(
352+
version,
353+
response.getTransportVersion(),
354+
response.getReleaseVersion(),
355+
channel
356+
);
328357
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
329358
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
330359
return resultVersion;

server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
3939
public void testV7Handshake() throws Exception {
4040
final BytesRef handshakeRequestBytes;
4141
final var requestId = randomNonNegativeLong();
42+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
4243
try (var outputStream = new BytesStreamOutput()) {
4344
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
4445
outputStream.writeLong(requestId);
@@ -50,7 +51,6 @@ public void testV7Handshake() throws Exception {
5051
outputStream.writeString("internal:tcp/handshake");
5152
outputStream.writeByte((byte) 0); // no parent task ID;
5253

53-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
5454
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
5555
outputStream.writeByte((byte) 4); // payload length
5656
outputStream.writeVInt(requestNodeTransportVersionId);
@@ -80,14 +80,15 @@ public void testV7Handshake() throws Exception {
8080
assertEquals((byte) 0, inputStream.readByte()); // no request headers
8181
assertEquals((byte) 0, inputStream.readByte()); // no response headers
8282
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
83-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
83+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
8484
assertEquals(-1, inputStream.read());
8585
}
8686
}
8787

8888
public void testV8Handshake() throws Exception {
8989
final BytesRef handshakeRequestBytes;
9090
final var requestId = randomNonNegativeLong();
91+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
9192
try (var outputStream = new BytesStreamOutput()) {
9293
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
9394
outputStream.writeLong(requestId);
@@ -100,7 +101,6 @@ public void testV8Handshake() throws Exception {
100101
outputStream.writeString("internal:tcp/handshake");
101102
outputStream.writeByte((byte) 0); // no parent task ID;
102103

103-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
104104
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
105105
outputStream.writeByte((byte) 4); // payload length
106106
outputStream.writeVInt(requestNodeTransportVersionId);
@@ -131,14 +131,15 @@ public void testV8Handshake() throws Exception {
131131
assertEquals((byte) 0, inputStream.readByte()); // no request headers
132132
assertEquals((byte) 0, inputStream.readByte()); // no response headers
133133
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
134-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
134+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
135135
assertEquals(-1, inputStream.read());
136136
}
137137
}
138138

139139
public void testV9Handshake() throws Exception {
140140
final BytesRef handshakeRequestBytes;
141141
final var requestId = randomNonNegativeLong();
142+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
142143
try (var outputStream = new BytesStreamOutput()) {
143144
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
144145
outputStream.writeLong(requestId);
@@ -150,7 +151,6 @@ public void testV9Handshake() throws Exception {
150151
outputStream.writeString("internal:tcp/handshake");
151152
outputStream.writeByte((byte) 0); // no parent task ID;
152153

153-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
154154
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
155155
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
156156
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
@@ -184,7 +184,7 @@ public void testV9Handshake() throws Exception {
184184
assertEquals((byte) 0, inputStream.readByte()); // no request headers
185185
assertEquals((byte) 0, inputStream.readByte()); // no response headers
186186
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
187-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
187+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
188188
assertEquals(Build.current().version(), inputStream.readString());
189189
assertEquals(-1, inputStream.read());
190190
}

0 commit comments

Comments
 (0)