Skip to content

Commit 6bdb9fe

Browse files
authored
Negotiate disordered transport handshakes (#123397)
Previously we'd reject transport handshakes from numerically-older nodes with a chronologically-newer release date. With this commit we instead negotiate to use the latest known version for future communications.
1 parent dcb0eb6 commit 6bdb9fe

File tree

4 files changed

+176
-61
lines changed

4 files changed

+176
-61
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,23 @@ public boolean isKnown() {
126126
return VersionsHolder.ALL_VERSIONS_MAP.containsKey(id);
127127
}
128128

129+
/**
130+
* @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if
131+
* there are no such versions.
132+
*/
133+
public TransportVersion bestKnownVersion() {
134+
if (VersionsHolder.ALL_VERSIONS_MAP.containsKey(id)) {
135+
return this;
136+
}
137+
TransportVersion bestSoFar = TransportVersions.ZERO;
138+
for (final var knownVersion : VersionsHolder.ALL_VERSIONS_MAP.values()) {
139+
if (knownVersion.after(bestSoFar) && knownVersion.before(this)) {
140+
bestSoFar = knownVersion;
141+
}
142+
}
143+
return bestSoFar;
144+
}
145+
129146
public static TransportVersion fromString(String str) {
130147
return TransportVersion.fromId(Integer.parseInt(str));
131148
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.TransportVersion;
14+
import org.elasticsearch.TransportVersions;
1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.common.bytes.BytesReference;
@@ -210,26 +211,50 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
210211
assert ignoreDeserializationErrors : exception;
211212
throw exception;
212213
}
213-
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
214-
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
214+
channel.sendResponse(
215+
new HandshakeResponse(
216+
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel),
217+
Build.current().version()
218+
)
219+
);
215220
}
216221

217-
static void ensureCompatibleVersion(
222+
private static TransportVersion ensureCompatibleVersion(
218223
TransportVersion localTransportVersion,
219224
TransportVersion remoteTransportVersion,
220225
String releaseVersion,
221226
Object channel
222227
) {
223228
if (TransportVersion.isCompatible(remoteTransportVersion)) {
224229
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
225-
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
226-
// knows how to do that.
227-
return;
230+
// Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current
231+
// transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is
232+
// initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or
233+
// an earlier one.
234+
return localTransportVersion;
228235
}
229-
if (remoteTransportVersion.isKnown()) {
230-
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
231-
// version is known to us.
232-
return;
236+
final var bestKnownVersion = remoteTransportVersion.bestKnownVersion();
237+
if (bestKnownVersion.equals(TransportVersions.ZERO) == false) {
238+
if (bestKnownVersion.equals(remoteTransportVersion) == false) {
239+
// Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact
240+
// transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade
241+
// that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the
242+
// best known version anyway since both ends will know it.
243+
logger.warn(
244+
"""
245+
Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \
246+
from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
247+
Upgrading to a chronologically-older release may not work reliably and is not recommended. \
248+
Falling back to transport protocol version [{}].""",
249+
releaseVersion,
250+
remoteTransportVersion,
251+
channel,
252+
Build.current().version(),
253+
localTransportVersion,
254+
bestKnownVersion
255+
);
256+
} // else remote is semantically older and we _do_ know its version, so we just use that without further fuss.
257+
return bestKnownVersion;
233258
}
234259
}
235260

@@ -287,8 +312,12 @@ public Executor executor() {
287312
public void handleResponse(HandshakeResponse response) {
288313
if (isDone.compareAndSet(false, true)) {
289314
ActionListener.completeWith(listener, () -> {
290-
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
291-
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
315+
final var resultVersion = ensureCompatibleVersion(
316+
version,
317+
response.getTransportVersion(),
318+
response.getReleaseVersion(),
319+
channel
320+
);
292321
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
293322
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
294323
return resultVersion;

server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
4141
public void testV8Handshake() throws Exception {
4242
final BytesRef handshakeRequestBytes;
4343
final var requestId = randomNonNegativeLong();
44+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
4445
try (var outputStream = new BytesStreamOutput()) {
4546
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
4647
outputStream.writeLong(requestId);
@@ -53,7 +54,6 @@ public void testV8Handshake() throws Exception {
5354
outputStream.writeString("internal:tcp/handshake");
5455
outputStream.writeByte((byte) 0); // no parent task ID;
5556

56-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
5757
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
5858
outputStream.writeByte((byte) 4); // payload length
5959
outputStream.writeVInt(requestNodeTransportVersionId);
@@ -84,7 +84,7 @@ public void testV8Handshake() throws Exception {
8484
assertEquals((byte) 0, inputStream.readByte()); // no request headers
8585
assertEquals((byte) 0, inputStream.readByte()); // no response headers
8686
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
87-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
87+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
8888
assertEquals(-1, inputStream.read());
8989
}
9090
}
@@ -93,6 +93,7 @@ public void testV8Handshake() throws Exception {
9393
public void testV9Handshake() throws Exception {
9494
final BytesRef handshakeRequestBytes;
9595
final var requestId = randomNonNegativeLong();
96+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
9697
try (var outputStream = new BytesStreamOutput()) {
9798
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
9899
outputStream.writeLong(requestId);
@@ -104,7 +105,6 @@ public void testV9Handshake() throws Exception {
104105
outputStream.writeString("internal:tcp/handshake");
105106
outputStream.writeByte((byte) 0); // no parent task ID;
106107

107-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
108108
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
109109
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
110110
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
@@ -138,7 +138,7 @@ public void testV9Handshake() throws Exception {
138138
assertEquals((byte) 0, inputStream.readByte()); // no request headers
139139
assertEquals((byte) 0, inputStream.readByte()); // no response headers
140140
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
141-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
141+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
142142
assertEquals(Build.current().version(), inputStream.readString());
143143
assertEquals(-1, inputStream.read());
144144
}

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 114 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2020
import org.elasticsearch.common.io.stream.StreamInput;
2121
import org.elasticsearch.core.FixForMultiProject;
22+
import org.elasticsearch.core.Strings;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.core.UpdateForV10;
2425
import org.elasticsearch.tasks.TaskId;
@@ -102,7 +103,7 @@ public void testHandshakeRequestAndResponse() throws IOException {
102103
}
103104

104105
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
105-
public void testIncompatibleHandshakeRequest() throws IOException {
106+
public void testIncompatibleHandshakeRequest() throws Exception {
106107
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
107108
getRandomIncompatibleTransportVersion(),
108109
randomIdentifier()
@@ -112,27 +113,69 @@ public void testIncompatibleHandshakeRequest() throws IOException {
112113
handshakeRequest.writeTo(bytesStreamOutput);
113114
StreamInput input = bytesStreamOutput.bytes().streamInput();
114115
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
115-
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
116116

117-
MockLog.assertThatLogger(
118-
() -> assertThat(
119-
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
120-
.getMessage(),
121-
allOf(
122-
containsString("Rejecting unreadable transport handshake"),
123-
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
124-
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
125-
containsString("which has an incompatible wire format")
117+
if (handshakeRequest.transportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) {
118+
119+
final PlainActionFuture<TransportResponse> responseFuture = new PlainActionFuture<>();
120+
final TestTransportChannel channel = new TestTransportChannel(responseFuture);
121+
122+
// we fall back to the best known version
123+
MockLog.assertThatLogger(() -> {
124+
try {
125+
handshaker.handleHandshake(channel, randomNonNegativeLong(), input);
126+
} catch (IOException e) {
127+
throw new AssertionError(e);
128+
}
129+
},
130+
TransportHandshaker.class,
131+
new MockLog.SeenEventExpectation(
132+
"warning",
133+
TransportHandshaker.class.getCanonicalName(),
134+
Level.WARN,
135+
Strings.format(
136+
"""
137+
Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
138+
a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
139+
Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
140+
transport protocol version [%s].""",
141+
handshakeRequest.releaseVersion,
142+
handshakeRequest.transportVersion,
143+
Build.current().version(),
144+
TransportVersion.current(),
145+
handshakeRequest.transportVersion.bestKnownVersion()
146+
)
126147
)
127-
),
128-
TransportHandshaker.class,
129-
new MockLog.SeenEventExpectation(
130-
"warning",
131-
TransportHandshaker.class.getCanonicalName(),
132-
Level.WARN,
133-
"Rejecting unreadable transport handshake * incompatible wire format."
134-
)
135-
);
148+
);
149+
150+
assertTrue(responseFuture.isDone());
151+
assertEquals(
152+
handshakeRequest.transportVersion.bestKnownVersion(),
153+
asInstanceOf(TransportHandshaker.HandshakeResponse.class, responseFuture.result()).getTransportVersion()
154+
);
155+
156+
} else {
157+
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
158+
159+
MockLog.assertThatLogger(
160+
() -> assertThat(
161+
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
162+
.getMessage(),
163+
allOf(
164+
containsString("Rejecting unreadable transport handshake"),
165+
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
166+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
167+
containsString("which has an incompatible wire format")
168+
)
169+
),
170+
TransportHandshaker.class,
171+
new MockLog.SeenEventExpectation(
172+
"warning",
173+
TransportHandshaker.class.getCanonicalName(),
174+
Level.WARN,
175+
"Rejecting unreadable transport handshake * incompatible wire format."
176+
)
177+
);
178+
}
136179
}
137180

138181
public void testHandshakeResponseFromOlderNode() throws Exception {
@@ -151,40 +194,66 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
151194
}
152195

153196
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
154-
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
197+
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() throws Exception {
155198
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
156199
final long reqId = randomNonNegativeLong();
157200
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
158201
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
159202

160203
assertFalse(versionFuture.isDone());
161204

162-
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
163-
getRandomIncompatibleTransportVersion(),
164-
randomIdentifier()
165-
);
205+
final var randomIncompatibleTransportVersion = getRandomIncompatibleTransportVersion();
206+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(randomIncompatibleTransportVersion, randomIdentifier());
207+
208+
if (randomIncompatibleTransportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) {
209+
// we fall back to the best known version
210+
MockLog.assertThatLogger(
211+
() -> handler.handleResponse(handshakeResponse),
212+
TransportHandshaker.class,
213+
new MockLog.SeenEventExpectation(
214+
"warning",
215+
TransportHandshaker.class.getCanonicalName(),
216+
Level.WARN,
217+
Strings.format(
218+
"""
219+
Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
220+
a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
221+
Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
222+
transport protocol version [%s].""",
223+
handshakeResponse.getReleaseVersion(),
224+
handshakeResponse.getTransportVersion(),
225+
Build.current().version(),
226+
TransportVersion.current(),
227+
randomIncompatibleTransportVersion.bestKnownVersion()
228+
)
229+
)
230+
);
166231

167-
MockLog.assertThatLogger(
168-
() -> handler.handleResponse(handshakeResponse),
169-
TransportHandshaker.class,
170-
new MockLog.SeenEventExpectation(
171-
"warning",
172-
TransportHandshaker.class.getCanonicalName(),
173-
Level.WARN,
174-
"Rejecting unreadable transport handshake * incompatible wire format."
175-
)
176-
);
232+
assertTrue(versionFuture.isDone());
233+
assertEquals(randomIncompatibleTransportVersion.bestKnownVersion(), versionFuture.result());
234+
} else {
235+
MockLog.assertThatLogger(
236+
() -> handler.handleResponse(handshakeResponse),
237+
TransportHandshaker.class,
238+
new MockLog.SeenEventExpectation(
239+
"warning",
240+
TransportHandshaker.class.getCanonicalName(),
241+
Level.WARN,
242+
"Rejecting unreadable transport handshake * incompatible wire format."
243+
)
244+
);
177245

178-
assertTrue(versionFuture.isDone());
179-
assertThat(
180-
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
181-
allOf(
182-
containsString("Rejecting unreadable transport handshake"),
183-
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
184-
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
185-
containsString("which has an incompatible wire format")
186-
)
187-
);
246+
assertTrue(versionFuture.isDone());
247+
assertThat(
248+
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
249+
allOf(
250+
containsString("Rejecting unreadable transport handshake"),
251+
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
252+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
253+
containsString("which has an incompatible wire format")
254+
)
255+
);
256+
}
188257
}
189258

190259
private static TransportVersion getRandomIncompatibleTransportVersion() {

0 commit comments

Comments
 (0)