Skip to content

Commit 9c203fe

Browse files
committed
Validate transport handshake from known version
With parallel releases on multiple branches it's possible that an older branch sees a transport version update that is not known to a numerically newer but chronologically older version. In that case the two nodes cannot intercommunicate, so with this commit we reject such connection attempts at the version negotiation stage. Backport of elastic#121747/elastic#121802 to 8.18
1 parent 9521dc7 commit 9c203fe

File tree

3 files changed

+154
-19
lines changed

3 files changed

+154
-19
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ public static TransportVersion current() {
9898
return CurrentHolder.CURRENT;
9999
}
100100

101+
/**
102+
* @return whether this is a known {@link TransportVersion}, i.e. one declared in {@link TransportVersions} or which dates back to
103+
* before 8.9.0 when they matched the release versions exactly and there was no branching or patching. Other versions may exist
104+
* in the wild (they're sent over the wire by numeric ID) but we don't know how to communicate using such versions.
105+
*/
106+
public boolean isKnown() {
107+
return before(TransportVersions.V_8_9_X) || TransportVersions.VERSION_IDS.containsKey(id);
108+
}
109+
101110
public static TransportVersion fromString(String str) {
102111
return TransportVersion.fromId(Integer.parseInt(str));
103112
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@
1111

1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.TransportVersion;
14-
import org.elasticsearch.TransportVersions;
1514
import org.elasticsearch.action.ActionListener;
1615
import org.elasticsearch.cluster.node.DiscoveryNode;
1716
import org.elasticsearch.common.bytes.BytesReference;
1817
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1918
import org.elasticsearch.common.io.stream.StreamInput;
2019
import org.elasticsearch.common.io.stream.StreamOutput;
2120
import org.elasticsearch.common.metrics.CounterMetric;
21+
import org.elasticsearch.core.Strings;
2222
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.logging.LogManager;
24+
import org.elasticsearch.logging.Logger;
2325
import org.elasticsearch.threadpool.ThreadPool;
2426

2527
import java.io.EOFException;
@@ -157,6 +159,8 @@ final class TransportHandshaker {
157159
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
158160
*/
159161

162+
private static final Logger logger = LogManager.getLogger(TransportHandshaker.class);
163+
160164
static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99);
161165
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
162166
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
@@ -195,7 +199,7 @@ void sendHandshake(
195199
ActionListener<TransportVersion> listener
196200
) {
197201
numHandshakes.inc();
198-
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, listener);
202+
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, channel, listener);
199203
pendingHandshakes.put(requestId, handler);
200204
channel.addCloseListener(
201205
ActionListener.running(() -> handler.handleLocalException(new TransportException("handshake failed because connection reset")))
@@ -221,9 +225,9 @@ void sendHandshake(
221225
}
222226

223227
void handleHandshake(TransportChannel channel, long requestId, StreamInput stream) throws IOException {
228+
final HandshakeRequest handshakeRequest;
224229
try {
225-
// Must read the handshake request to exhaust the stream
226-
new HandshakeRequest(stream);
230+
handshakeRequest = new HandshakeRequest(stream);
227231
} catch (Exception e) {
228232
assert ignoreDeserializationErrors : e;
229233
throw e;
@@ -242,9 +246,44 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
242246
assert ignoreDeserializationErrors : exception;
243247
throw exception;
244248
}
249+
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
245250
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
246251
}
247252

253+
static void ensureCompatibleVersion(
254+
TransportVersion localTransportVersion,
255+
TransportVersion remoteTransportVersion,
256+
String releaseVersion,
257+
Object channel
258+
) {
259+
if (TransportVersion.isCompatible(remoteTransportVersion)) {
260+
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
261+
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
262+
// knows how to do that.
263+
return;
264+
}
265+
if (remoteTransportVersion.isKnown()) {
266+
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
267+
// version is known to us.
268+
return;
269+
}
270+
}
271+
272+
final var message = Strings.format(
273+
"""
274+
Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
275+
version [%s/%s] which has an incompatible wire format.""",
276+
releaseVersion,
277+
remoteTransportVersion,
278+
channel,
279+
Build.current().version(),
280+
localTransportVersion
281+
);
282+
logger.warn(message);
283+
throw new IllegalStateException(message);
284+
285+
}
286+
248287
TransportResponseHandler<HandshakeResponse> removeHandlerForHandshake(long requestId) {
249288
return pendingHandshakes.remove(requestId);
250289
}
@@ -260,11 +299,13 @@ long getNumHandshakes() {
260299
private class HandshakeResponseHandler implements TransportResponseHandler<HandshakeResponse> {
261300

262301
private final long requestId;
302+
private final TcpChannel channel;
263303
private final ActionListener<TransportVersion> listener;
264304
private final AtomicBoolean isDone = new AtomicBoolean(false);
265305

266-
private HandshakeResponseHandler(long requestId, ActionListener<TransportVersion> listener) {
306+
private HandshakeResponseHandler(long requestId, TcpChannel channel, ActionListener<TransportVersion> listener) {
267307
this.requestId = requestId;
308+
this.channel = channel;
268309
this.listener = listener;
269310
}
270311

@@ -281,20 +322,13 @@ public Executor executor() {
281322
@Override
282323
public void handleResponse(HandshakeResponse response) {
283324
if (isDone.compareAndSet(false, true)) {
284-
TransportVersion responseVersion = response.transportVersion;
285-
if (TransportVersion.isCompatible(responseVersion) == false) {
286-
listener.onFailure(
287-
new IllegalStateException(
288-
"Received message from unsupported version: ["
289-
+ responseVersion
290-
+ "] minimal compatible version is: ["
291-
+ TransportVersions.MINIMUM_COMPATIBLE
292-
+ "]"
293-
)
294-
);
295-
} else {
296-
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()));
297-
}
325+
ActionListener.completeWith(listener, () -> {
326+
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
327+
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
328+
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
329+
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
330+
return resultVersion;
331+
});
298332
}
299333
}
300334

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
*/
99
package org.elasticsearch.transport;
1010

11+
import org.apache.logging.log4j.Level;
12+
import org.elasticsearch.Build;
1113
import org.elasticsearch.TransportVersion;
14+
import org.elasticsearch.TransportVersions;
15+
import org.elasticsearch.action.ActionListener;
1216
import org.elasticsearch.action.support.PlainActionFuture;
1317
import org.elasticsearch.cluster.node.DiscoveryNode;
1418
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -17,13 +21,17 @@
1721
import org.elasticsearch.core.TimeValue;
1822
import org.elasticsearch.tasks.TaskId;
1923
import org.elasticsearch.test.ESTestCase;
24+
import org.elasticsearch.test.MockLog;
2025
import org.elasticsearch.test.TransportVersionUtils;
26+
import org.elasticsearch.test.junit.annotations.TestLogging;
2127
import org.elasticsearch.threadpool.TestThreadPool;
2228

2329
import java.io.IOException;
2430
import java.util.Collections;
31+
import java.util.concurrent.ExecutionException;
2532
import java.util.concurrent.TimeUnit;
2633

34+
import static org.hamcrest.Matchers.allOf;
2735
import static org.hamcrest.Matchers.containsString;
2836
import static org.mockito.Mockito.doThrow;
2937
import static org.mockito.Mockito.mock;
@@ -90,6 +98,42 @@ public void testHandshakeRequestAndResponse() throws IOException {
9098
assertEquals(TransportVersion.current(), versionFuture.actionGet());
9199
}
92100

101+
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
102+
public void testIncompatibleHandshakeRequest() throws IOException {
103+
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
104+
getRandomIncompatibleTransportVersion(),
105+
randomIdentifier()
106+
);
107+
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
108+
bytesStreamOutput.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
109+
handshakeRequest.writeTo(bytesStreamOutput);
110+
StreamInput input = bytesStreamOutput.bytes().streamInput();
111+
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
112+
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
113+
114+
MockLog.assertThatLogger(
115+
() -> assertThat(
116+
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
117+
.getMessage(),
118+
allOf(
119+
containsString("Rejecting unreadable transport handshake"),
120+
containsString(
121+
"[" + handshakeRequest.transportVersion.toReleaseVersion() + "/" + handshakeRequest.transportVersion + "]"
122+
),
123+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
124+
containsString("which has an incompatible wire format")
125+
)
126+
),
127+
TransportHandshaker.class,
128+
new MockLog.SeenEventExpectation(
129+
"warning",
130+
TransportHandshaker.class.getCanonicalName(),
131+
Level.WARN,
132+
"Rejecting unreadable transport handshake * incompatible wire format."
133+
)
134+
);
135+
}
136+
93137
public void testHandshakeResponseFromOlderNode() throws Exception {
94138
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
95139
final long reqId = randomNonNegativeLong();
@@ -105,6 +149,54 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
105149
assertEquals(remoteVersion, versionFuture.result());
106150
}
107151

152+
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
153+
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
154+
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
155+
final long reqId = randomNonNegativeLong();
156+
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
157+
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
158+
159+
assertFalse(versionFuture.isDone());
160+
161+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
162+
getRandomIncompatibleTransportVersion(),
163+
randomIdentifier()
164+
);
165+
166+
MockLog.assertThatLogger(
167+
() -> handler.handleResponse(handshakeResponse),
168+
TransportHandshaker.class,
169+
new MockLog.SeenEventExpectation(
170+
"warning",
171+
TransportHandshaker.class.getCanonicalName(),
172+
Level.WARN,
173+
"Rejecting unreadable transport handshake * incompatible wire format."
174+
)
175+
);
176+
177+
assertTrue(versionFuture.isDone());
178+
assertThat(
179+
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
180+
allOf(
181+
containsString("Rejecting unreadable transport handshake"),
182+
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
183+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
184+
containsString("which has an incompatible wire format")
185+
)
186+
);
187+
}
188+
189+
private static TransportVersion getRandomIncompatibleTransportVersion() {
190+
return randomBoolean()
191+
// either older than MINIMUM_COMPATIBLE
192+
? new TransportVersion(between(1, TransportVersions.MINIMUM_COMPATIBLE.id() - 1))
193+
// or between MINIMUM_COMPATIBLE and current but not known
194+
: randomValueOtherThanMany(
195+
TransportVersion::isKnown,
196+
() -> new TransportVersion(between(TransportVersions.MINIMUM_COMPATIBLE.id(), TransportVersion.current().id()))
197+
);
198+
}
199+
108200
public void testHandshakeResponseFromNewerNode() throws Exception {
109201
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
110202
final long reqId = randomNonNegativeLong();

0 commit comments

Comments
 (0)