Skip to content

Commit b07705c

Browse files
authored
Negotiate disordered transport handshakes (#123766)
* Negotiate disordered transport handshakes Previously we'd reject transport handshakes from numerically-older nodes with a chronologically-newer release date. With this commit we instead negotiate to use the latest known version for future communications. Backport of #123397 to 8.18 * Fix test * Fix for pre-8.9 versions
1 parent 71e769d commit b07705c

File tree

4 files changed

+179
-67
lines changed

4 files changed

+179
-67
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,23 @@ public boolean isKnown() {
107107
return before(TransportVersions.V_8_9_X) || TransportVersions.VERSION_IDS.containsKey(id);
108108
}
109109

110+
/**
111+
* @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if
112+
* there are no such versions.
113+
*/
114+
public TransportVersion bestKnownVersion() {
115+
if (isKnown()) {
116+
return this;
117+
}
118+
TransportVersion bestSoFar = TransportVersions.ZERO;
119+
for (final var knownVersion : TransportVersions.VERSION_IDS.values()) {
120+
if (knownVersion.after(bestSoFar) && knownVersion.before(this)) {
121+
bestSoFar = knownVersion;
122+
}
123+
}
124+
return bestSoFar;
125+
}
126+
110127
public static TransportVersion fromString(String str) {
111128
return TransportVersion.fromId(Integer.parseInt(str));
112129
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -252,17 +252,21 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
252252
assert ignoreDeserializationErrors : exception;
253253
throw exception;
254254
}
255-
ensureCompatibleVersion(
256-
version,
257-
handshakeRequest.transportVersion,
258-
handshakeRequest.releaseVersion,
259-
channel,
260-
threadPool.getThreadContext()
255+
channel.sendResponse(
256+
new HandshakeResponse(
257+
ensureCompatibleVersion(
258+
version,
259+
handshakeRequest.transportVersion,
260+
handshakeRequest.releaseVersion,
261+
channel,
262+
threadPool.getThreadContext()
263+
),
264+
Build.current().version()
265+
)
261266
);
262-
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
263267
}
264268

265-
static void ensureCompatibleVersion(
269+
private static TransportVersion ensureCompatibleVersion(
266270
TransportVersion localTransportVersion,
267271
TransportVersion remoteTransportVersion,
268272
String remoteReleaseVersion,
@@ -281,14 +285,34 @@ static void ensureCompatibleVersion(
281285
}
282286
}
283287
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
284-
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
285-
// knows how to do that.
286-
return;
288+
// Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current
289+
// transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is
290+
// initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or
291+
// an earlier one.
292+
return localTransportVersion;
287293
}
288-
if (remoteTransportVersion.isKnown()) {
289-
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
290-
// version is known to us.
291-
return;
294+
final var bestKnownVersion = remoteTransportVersion.bestKnownVersion();
295+
if (bestKnownVersion.equals(TransportVersions.ZERO) == false) {
296+
if (bestKnownVersion.equals(remoteTransportVersion) == false) {
297+
// Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact
298+
// transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade
299+
// that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the
300+
// best known version anyway since both ends will know it.
301+
logger.warn(
302+
"""
303+
Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \
304+
from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
305+
Upgrading to a chronologically-older release may not work reliably and is not recommended. \
306+
Falling back to transport protocol version [{}].""",
307+
remoteReleaseVersion,
308+
remoteTransportVersion,
309+
channel,
310+
Build.current().version(),
311+
localTransportVersion,
312+
bestKnownVersion
313+
);
314+
} // else remote is semantically older and we _do_ know its version, so we just use that without further fuss.
315+
return bestKnownVersion;
292316
}
293317
}
294318

@@ -364,14 +388,13 @@ public Executor executor() {
364388
public void handleResponse(HandshakeResponse response) {
365389
if (isDone.compareAndSet(false, true)) {
366390
ActionListener.completeWith(listener, () -> {
367-
ensureCompatibleVersion(
391+
final var resultVersion = ensureCompatibleVersion(
368392
version,
369393
response.getTransportVersion(),
370394
response.getReleaseVersion(),
371395
channel,
372396
threadPool.getThreadContext()
373397
);
374-
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
375398
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
376399
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
377400
return resultVersion;

server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
3939
public void testV7Handshake() throws Exception {
4040
final BytesRef handshakeRequestBytes;
4141
final var requestId = randomNonNegativeLong();
42+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
4243
try (var outputStream = new BytesStreamOutput()) {
4344
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
4445
outputStream.writeLong(requestId);
@@ -50,7 +51,6 @@ public void testV7Handshake() throws Exception {
5051
outputStream.writeString("internal:tcp/handshake");
5152
outputStream.writeByte((byte) 0); // no parent task ID;
5253

53-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
5454
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
5555
outputStream.writeByte((byte) 4); // payload length
5656
outputStream.writeVInt(requestNodeTransportVersionId);
@@ -80,14 +80,15 @@ public void testV7Handshake() throws Exception {
8080
assertEquals((byte) 0, inputStream.readByte()); // no request headers
8181
assertEquals((byte) 0, inputStream.readByte()); // no response headers
8282
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
83-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
83+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
8484
assertEquals(-1, inputStream.read());
8585
}
8686
}
8787

8888
public void testV8Handshake() throws Exception {
8989
final BytesRef handshakeRequestBytes;
9090
final var requestId = randomNonNegativeLong();
91+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
9192
try (var outputStream = new BytesStreamOutput()) {
9293
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
9394
outputStream.writeLong(requestId);
@@ -100,7 +101,6 @@ public void testV8Handshake() throws Exception {
100101
outputStream.writeString("internal:tcp/handshake");
101102
outputStream.writeByte((byte) 0); // no parent task ID;
102103

103-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
104104
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
105105
outputStream.writeByte((byte) 4); // payload length
106106
outputStream.writeVInt(requestNodeTransportVersionId);
@@ -131,14 +131,15 @@ public void testV8Handshake() throws Exception {
131131
assertEquals((byte) 0, inputStream.readByte()); // no request headers
132132
assertEquals((byte) 0, inputStream.readByte()); // no response headers
133133
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
134-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
134+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
135135
assertEquals(-1, inputStream.read());
136136
}
137137
}
138138

139139
public void testV9Handshake() throws Exception {
140140
final BytesRef handshakeRequestBytes;
141141
final var requestId = randomNonNegativeLong();
142+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
142143
try (var outputStream = new BytesStreamOutput()) {
143144
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
144145
outputStream.writeLong(requestId);
@@ -150,7 +151,6 @@ public void testV9Handshake() throws Exception {
150151
outputStream.writeString("internal:tcp/handshake");
151152
outputStream.writeByte((byte) 0); // no parent task ID;
152153

153-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
154154
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
155155
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
156156
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
@@ -184,7 +184,7 @@ public void testV9Handshake() throws Exception {
184184
assertEquals((byte) 0, inputStream.readByte()); // no request headers
185185
assertEquals((byte) 0, inputStream.readByte()); // no response headers
186186
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
187-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
187+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
188188
assertEquals(Build.current().version(), inputStream.readString());
189189
assertEquals(-1, inputStream.read());
190190
}

0 commit comments

Comments
 (0)