Skip to content

Commit 1e5145b

Browse files
authored
Negotiate disordered transport handshakes (elastic#123762)
Previously we'd reject transport handshakes from numerically-older nodes with a chronologically-newer release date. With this commit we instead negotiate to use the latest known version for future communications. Backport of elastic#123397 to 9.0
1 parent d55ae71 commit 1e5145b

File tree

4 files changed

+176
-61
lines changed

4 files changed

+176
-61
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,23 @@ public boolean isKnown() {
126126
return VersionsHolder.ALL_VERSIONS_MAP.containsKey(id);
127127
}
128128

129+
/**
130+
* @return the newest known {@link TransportVersion} which is no older than this instance. Returns {@link TransportVersions#ZERO} if
131+
* there are no such versions.
132+
*/
133+
public TransportVersion bestKnownVersion() {
134+
if (VersionsHolder.ALL_VERSIONS_MAP.containsKey(id)) {
135+
return this;
136+
}
137+
TransportVersion bestSoFar = TransportVersions.ZERO;
138+
for (final var knownVersion : VersionsHolder.ALL_VERSIONS_MAP.values()) {
139+
if (knownVersion.after(bestSoFar) && knownVersion.before(this)) {
140+
bestSoFar = knownVersion;
141+
}
142+
}
143+
return bestSoFar;
144+
}
145+
129146
public static TransportVersion fromString(String str) {
130147
return TransportVersion.fromId(Integer.parseInt(str));
131148
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.TransportVersion;
14+
import org.elasticsearch.TransportVersions;
1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.common.bytes.BytesReference;
@@ -210,26 +211,50 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
210211
assert ignoreDeserializationErrors : exception;
211212
throw exception;
212213
}
213-
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
214-
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
214+
channel.sendResponse(
215+
new HandshakeResponse(
216+
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel),
217+
Build.current().version()
218+
)
219+
);
215220
}
216221

217-
static void ensureCompatibleVersion(
222+
private static TransportVersion ensureCompatibleVersion(
218223
TransportVersion localTransportVersion,
219224
TransportVersion remoteTransportVersion,
220225
String releaseVersion,
221226
Object channel
222227
) {
223228
if (TransportVersion.isCompatible(remoteTransportVersion)) {
224229
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
225-
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
226-
// knows how to do that.
227-
return;
230+
// Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current
231+
// transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is
232+
// initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or
233+
// an earlier one.
234+
return localTransportVersion;
228235
}
229-
if (remoteTransportVersion.isKnown()) {
230-
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
231-
// version is known to us.
232-
return;
236+
final var bestKnownVersion = remoteTransportVersion.bestKnownVersion();
237+
if (bestKnownVersion.equals(TransportVersions.ZERO) == false) {
238+
if (bestKnownVersion.equals(remoteTransportVersion) == false) {
239+
// Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact
240+
// transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade
241+
// that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the
242+
// best known version anyway since both ends will know it.
243+
logger.warn(
244+
"""
245+
Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \
246+
from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \
247+
Upgrading to a chronologically-older release may not work reliably and is not recommended. \
248+
Falling back to transport protocol version [{}].""",
249+
releaseVersion,
250+
remoteTransportVersion,
251+
channel,
252+
Build.current().version(),
253+
localTransportVersion,
254+
bestKnownVersion
255+
);
256+
} // else remote is semantically older and we _do_ know its version, so we just use that without further fuss.
257+
return bestKnownVersion;
233258
}
234259
}
235260

@@ -287,8 +312,12 @@ public Executor executor() {
287312
public void handleResponse(HandshakeResponse response) {
288313
if (isDone.compareAndSet(false, true)) {
289314
ActionListener.completeWith(listener, () -> {
290-
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
291-
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
315+
final var resultVersion = ensureCompatibleVersion(
316+
version,
317+
response.getTransportVersion(),
318+
response.getReleaseVersion(),
319+
channel
320+
);
292321
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
293322
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
294323
return resultVersion;

server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {
4141
public void testV8Handshake() throws Exception {
4242
final BytesRef handshakeRequestBytes;
4343
final var requestId = randomNonNegativeLong();
44+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
4445
try (var outputStream = new BytesStreamOutput()) {
4546
outputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
4647
outputStream.writeLong(requestId);
@@ -53,7 +54,6 @@ public void testV8Handshake() throws Exception {
5354
outputStream.writeString("internal:tcp/handshake");
5455
outputStream.writeByte((byte) 0); // no parent task ID;
5556

56-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
5757
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
5858
outputStream.writeByte((byte) 4); // payload length
5959
outputStream.writeVInt(requestNodeTransportVersionId);
@@ -84,7 +84,7 @@ public void testV8Handshake() throws Exception {
8484
assertEquals((byte) 0, inputStream.readByte()); // no request headers
8585
assertEquals((byte) 0, inputStream.readByte()); // no response headers
8686
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
87-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
87+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
8888
assertEquals(-1, inputStream.read());
8989
}
9090
}
@@ -93,6 +93,7 @@ public void testV8Handshake() throws Exception {
9393
public void testV9Handshake() throws Exception {
9494
final BytesRef handshakeRequestBytes;
9595
final var requestId = randomNonNegativeLong();
96+
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
9697
try (var outputStream = new BytesStreamOutput()) {
9798
outputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
9899
outputStream.writeLong(requestId);
@@ -104,7 +105,6 @@ public void testV9Handshake() throws Exception {
104105
outputStream.writeString("internal:tcp/handshake");
105106
outputStream.writeByte((byte) 0); // no parent task ID;
106107

107-
final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
108108
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
109109
final var releaseVersionLength = between(0, 127 - 5); // so that its length, and the length of the payload, is a one-byte vInt
110110
final var requestNodeReleaseVersion = randomAlphaOfLength(releaseVersionLength);
@@ -138,7 +138,7 @@ public void testV9Handshake() throws Exception {
138138
assertEquals((byte) 0, inputStream.readByte()); // no request headers
139139
assertEquals((byte) 0, inputStream.readByte()); // no response headers
140140
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
141-
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
141+
assertEquals(requestNodeTransportVersionId, inputStream.readVInt());
142142
assertEquals(Build.current().version(), inputStream.readString());
143143
assertEquals(-1, inputStream.read());
144144
}

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 114 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1919
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2020
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.core.Strings;
2122
import org.elasticsearch.core.TimeValue;
2223
import org.elasticsearch.core.UpdateForV10;
2324
import org.elasticsearch.tasks.TaskId;
@@ -101,7 +102,7 @@ public void testHandshakeRequestAndResponse() throws IOException {
101102
}
102103

103104
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
104-
public void testIncompatibleHandshakeRequest() throws IOException {
105+
public void testIncompatibleHandshakeRequest() throws Exception {
105106
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
106107
getRandomIncompatibleTransportVersion(),
107108
randomIdentifier()
@@ -111,27 +112,69 @@ public void testIncompatibleHandshakeRequest() throws IOException {
111112
handshakeRequest.writeTo(bytesStreamOutput);
112113
StreamInput input = bytesStreamOutput.bytes().streamInput();
113114
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
114-
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
115115

116-
MockLog.assertThatLogger(
117-
() -> assertThat(
118-
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
119-
.getMessage(),
120-
allOf(
121-
containsString("Rejecting unreadable transport handshake"),
122-
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
123-
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
124-
containsString("which has an incompatible wire format")
116+
if (handshakeRequest.transportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) {
117+
118+
final PlainActionFuture<TransportResponse> responseFuture = new PlainActionFuture<>();
119+
final TestTransportChannel channel = new TestTransportChannel(responseFuture);
120+
121+
// we fall back to the best known version
122+
MockLog.assertThatLogger(() -> {
123+
try {
124+
handshaker.handleHandshake(channel, randomNonNegativeLong(), input);
125+
} catch (IOException e) {
126+
throw new AssertionError(e);
127+
}
128+
},
129+
TransportHandshaker.class,
130+
new MockLog.SeenEventExpectation(
131+
"warning",
132+
TransportHandshaker.class.getCanonicalName(),
133+
Level.WARN,
134+
Strings.format(
135+
"""
136+
Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
137+
a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
138+
Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
139+
transport protocol version [%s].""",
140+
handshakeRequest.releaseVersion,
141+
handshakeRequest.transportVersion,
142+
Build.current().version(),
143+
TransportVersion.current(),
144+
handshakeRequest.transportVersion.bestKnownVersion()
145+
)
125146
)
126-
),
127-
TransportHandshaker.class,
128-
new MockLog.SeenEventExpectation(
129-
"warning",
130-
TransportHandshaker.class.getCanonicalName(),
131-
Level.WARN,
132-
"Rejecting unreadable transport handshake * incompatible wire format."
133-
)
134-
);
147+
);
148+
149+
assertTrue(responseFuture.isDone());
150+
assertEquals(
151+
handshakeRequest.transportVersion.bestKnownVersion(),
152+
asInstanceOf(TransportHandshaker.HandshakeResponse.class, responseFuture.result()).getTransportVersion()
153+
);
154+
155+
} else {
156+
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
157+
158+
MockLog.assertThatLogger(
159+
() -> assertThat(
160+
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
161+
.getMessage(),
162+
allOf(
163+
containsString("Rejecting unreadable transport handshake"),
164+
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
165+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
166+
containsString("which has an incompatible wire format")
167+
)
168+
),
169+
TransportHandshaker.class,
170+
new MockLog.SeenEventExpectation(
171+
"warning",
172+
TransportHandshaker.class.getCanonicalName(),
173+
Level.WARN,
174+
"Rejecting unreadable transport handshake * incompatible wire format."
175+
)
176+
);
177+
}
135178
}
136179

137180
public void testHandshakeResponseFromOlderNode() throws Exception {
@@ -150,40 +193,66 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
150193
}
151194

152195
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
153-
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
196+
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() throws Exception {
154197
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
155198
final long reqId = randomNonNegativeLong();
156199
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
157200
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
158201

159202
assertFalse(versionFuture.isDone());
160203

161-
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
162-
getRandomIncompatibleTransportVersion(),
163-
randomIdentifier()
164-
);
204+
final var randomIncompatibleTransportVersion = getRandomIncompatibleTransportVersion();
205+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(randomIncompatibleTransportVersion, randomIdentifier());
206+
207+
if (randomIncompatibleTransportVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE)) {
208+
// we fall back to the best known version
209+
MockLog.assertThatLogger(
210+
() -> handler.handleResponse(handshakeResponse),
211+
TransportHandshaker.class,
212+
new MockLog.SeenEventExpectation(
213+
"warning",
214+
TransportHandshaker.class.getCanonicalName(),
215+
Level.WARN,
216+
Strings.format(
217+
"""
218+
Negotiating transport handshake with remote node with version [%s/%s] received on [*] which appears to be from \
219+
a chronologically-older release with a numerically-newer version compared to this node's version [%s/%s]. \
220+
Upgrading to a chronologically-older release may not work reliably and is not recommended. Falling back to \
221+
transport protocol version [%s].""",
222+
handshakeResponse.getReleaseVersion(),
223+
handshakeResponse.getTransportVersion(),
224+
Build.current().version(),
225+
TransportVersion.current(),
226+
randomIncompatibleTransportVersion.bestKnownVersion()
227+
)
228+
)
229+
);
165230

166-
MockLog.assertThatLogger(
167-
() -> handler.handleResponse(handshakeResponse),
168-
TransportHandshaker.class,
169-
new MockLog.SeenEventExpectation(
170-
"warning",
171-
TransportHandshaker.class.getCanonicalName(),
172-
Level.WARN,
173-
"Rejecting unreadable transport handshake * incompatible wire format."
174-
)
175-
);
231+
assertTrue(versionFuture.isDone());
232+
assertEquals(randomIncompatibleTransportVersion.bestKnownVersion(), versionFuture.result());
233+
} else {
234+
MockLog.assertThatLogger(
235+
() -> handler.handleResponse(handshakeResponse),
236+
TransportHandshaker.class,
237+
new MockLog.SeenEventExpectation(
238+
"warning",
239+
TransportHandshaker.class.getCanonicalName(),
240+
Level.WARN,
241+
"Rejecting unreadable transport handshake * incompatible wire format."
242+
)
243+
);
176244

177-
assertTrue(versionFuture.isDone());
178-
assertThat(
179-
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
180-
allOf(
181-
containsString("Rejecting unreadable transport handshake"),
182-
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
183-
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
184-
containsString("which has an incompatible wire format")
185-
)
186-
);
245+
assertTrue(versionFuture.isDone());
246+
assertThat(
247+
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
248+
allOf(
249+
containsString("Rejecting unreadable transport handshake"),
250+
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
251+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
252+
containsString("which has an incompatible wire format")
253+
)
254+
);
255+
}
187256
}
188257

189258
private static TransportVersion getRandomIncompatibleTransportVersion() {

0 commit comments

Comments
 (0)