Skip to content

Commit 90fc6d3

Browse files
committed
Validate transport handshake from known version
With parallel releases on multiple branches it's possible that an older branch sees a transport version update that is not known to a numerically newer but chronologically older version. In that case the two nodes cannot intercommunicate, so with this commit we reject such connection attempts at the version negotiation stage.
1 parent 09bc343 commit 90fc6d3

File tree

3 files changed

+155
-19
lines changed

3 files changed

+155
-19
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ public static List<TransportVersion> getAllVersions() {
118118
return VersionsHolder.ALL_VERSIONS;
119119
}
120120

121+
/**
122+
* @return whether the given {@code id} corresponds to a known {@link TransportVersion}.
123+
*/
124+
public static boolean isKnownVersionId(int id) {
125+
return VersionsHolder.ALL_VERSIONS_MAP.containsKey(id);
126+
}
127+
121128
public static TransportVersion fromString(String str) {
122129
return TransportVersion.fromId(Integer.parseInt(str));
123130
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@
1111

1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.TransportVersion;
14-
import org.elasticsearch.TransportVersions;
1514
import org.elasticsearch.action.ActionListener;
1615
import org.elasticsearch.cluster.node.DiscoveryNode;
1716
import org.elasticsearch.common.bytes.BytesReference;
1817
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1918
import org.elasticsearch.common.io.stream.StreamInput;
2019
import org.elasticsearch.common.io.stream.StreamOutput;
2120
import org.elasticsearch.common.metrics.CounterMetric;
21+
import org.elasticsearch.core.Strings;
2222
import org.elasticsearch.core.TimeValue;
2323
import org.elasticsearch.core.UpdateForV9;
24+
import org.elasticsearch.logging.LogManager;
25+
import org.elasticsearch.logging.Logger;
2426
import org.elasticsearch.threadpool.ThreadPool;
2527

2628
import java.io.EOFException;
@@ -126,6 +128,8 @@ final class TransportHandshaker {
126128
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
127129
*/
128130

131+
private static final Logger logger = LogManager.getLogger(TransportHandshaker.class);
132+
129133
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
130134
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
131135
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(V8_HANDSHAKE_VERSION, V9_HANDSHAKE_VERSION);
@@ -159,7 +163,7 @@ void sendHandshake(
159163
ActionListener<TransportVersion> listener
160164
) {
161165
numHandshakes.inc();
162-
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, listener);
166+
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, channel, listener);
163167
pendingHandshakes.put(requestId, handler);
164168
channel.addCloseListener(
165169
ActionListener.running(() -> handler.handleLocalException(new TransportException("handshake failed because connection reset")))
@@ -185,9 +189,9 @@ void sendHandshake(
185189
}
186190

187191
void handleHandshake(TransportChannel channel, long requestId, StreamInput stream) throws IOException {
192+
final HandshakeRequest handshakeRequest;
188193
try {
189-
// Must read the handshake request to exhaust the stream
190-
new HandshakeRequest(stream);
194+
handshakeRequest = new HandshakeRequest(stream);
191195
} catch (Exception e) {
192196
assert ignoreDeserializationErrors : e;
193197
throw e;
@@ -206,9 +210,44 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
206210
assert ignoreDeserializationErrors : exception;
207211
throw exception;
208212
}
213+
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
209214
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
210215
}
211216

217+
static void ensureCompatibleVersion(
218+
TransportVersion localTransportVersion,
219+
TransportVersion remoteTransportVersion,
220+
String releaseVersion,
221+
Object channel
222+
) {
223+
if (TransportVersion.isCompatible(remoteTransportVersion)) {
224+
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
225+
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
226+
// knows how to do that.
227+
return;
228+
}
229+
if (TransportVersion.isKnownVersionId(remoteTransportVersion.id())) {
230+
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
231+
// version is known to us.
232+
return;
233+
}
234+
}
235+
236+
final var message = Strings.format(
237+
"""
238+
Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
239+
version [%s/%s] which has an incompatible wire format.""",
240+
releaseVersion,
241+
remoteTransportVersion,
242+
channel,
243+
Build.current().version(),
244+
localTransportVersion
245+
);
246+
logger.warn(message);
247+
throw new IllegalStateException(message);
248+
249+
}
250+
212251
TransportResponseHandler<HandshakeResponse> removeHandlerForHandshake(long requestId) {
213252
return pendingHandshakes.remove(requestId);
214253
}
@@ -224,11 +263,13 @@ long getNumHandshakes() {
224263
private class HandshakeResponseHandler implements TransportResponseHandler<HandshakeResponse> {
225264

226265
private final long requestId;
266+
private final TcpChannel channel;
227267
private final ActionListener<TransportVersion> listener;
228268
private final AtomicBoolean isDone = new AtomicBoolean(false);
229269

230-
private HandshakeResponseHandler(long requestId, ActionListener<TransportVersion> listener) {
270+
private HandshakeResponseHandler(long requestId, TcpChannel channel, ActionListener<TransportVersion> listener) {
231271
this.requestId = requestId;
272+
this.channel = channel;
232273
this.listener = listener;
233274
}
234275

@@ -245,20 +286,13 @@ public Executor executor() {
245286
@Override
246287
public void handleResponse(HandshakeResponse response) {
247288
if (isDone.compareAndSet(false, true)) {
248-
TransportVersion responseVersion = response.transportVersion;
249-
if (TransportVersion.isCompatible(responseVersion) == false) {
250-
listener.onFailure(
251-
new IllegalStateException(
252-
"Received message from unsupported version: ["
253-
+ responseVersion
254-
+ "] minimal compatible version is: ["
255-
+ TransportVersions.MINIMUM_COMPATIBLE
256-
+ "]"
257-
)
258-
);
259-
} else {
260-
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()));
261-
}
289+
ActionListener.completeWith(listener, () -> {
290+
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
291+
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
292+
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
293+
|| TransportVersion.isKnownVersionId(resultVersion.id()) : "negotiated unknown version " + resultVersion;
294+
return resultVersion;
295+
});
262296
}
263297
}
264298

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
*/
99
package org.elasticsearch.transport;
1010

11+
import org.apache.logging.log4j.Level;
12+
import org.elasticsearch.Build;
1113
import org.elasticsearch.TransportVersion;
14+
import org.elasticsearch.TransportVersions;
1215
import org.elasticsearch.Version;
16+
import org.elasticsearch.action.ActionListener;
1317
import org.elasticsearch.action.support.PlainActionFuture;
1418
import org.elasticsearch.cluster.node.DiscoveryNode;
1519
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -19,13 +23,17 @@
1923
import org.elasticsearch.core.UpdateForV10;
2024
import org.elasticsearch.tasks.TaskId;
2125
import org.elasticsearch.test.ESTestCase;
26+
import org.elasticsearch.test.MockLog;
2227
import org.elasticsearch.test.TransportVersionUtils;
28+
import org.elasticsearch.test.junit.annotations.TestLogging;
2329
import org.elasticsearch.threadpool.TestThreadPool;
2430

2531
import java.io.IOException;
2632
import java.util.Collections;
33+
import java.util.concurrent.ExecutionException;
2734
import java.util.concurrent.TimeUnit;
2835

36+
import static org.hamcrest.Matchers.allOf;
2937
import static org.hamcrest.Matchers.containsString;
3038
import static org.mockito.Mockito.doThrow;
3139
import static org.mockito.Mockito.mock;
@@ -93,6 +101,40 @@ public void testHandshakeRequestAndResponse() throws IOException {
93101
assertEquals(TransportVersion.current(), versionFuture.actionGet());
94102
}
95103

104+
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
105+
public void testIncompatibleHandshakeRequest() throws IOException {
106+
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
107+
getRandomIncompatibleTransportVersion(),
108+
randomIdentifier()
109+
);
110+
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
111+
bytesStreamOutput.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
112+
handshakeRequest.writeTo(bytesStreamOutput);
113+
StreamInput input = bytesStreamOutput.bytes().streamInput();
114+
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
115+
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
116+
117+
MockLog.assertThatLogger(
118+
() -> assertThat(
119+
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
120+
.getMessage(),
121+
allOf(
122+
containsString("Rejecting unreadable transport handshake"),
123+
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
124+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
125+
containsString("which has an incompatible wire format")
126+
)
127+
),
128+
TransportHandshaker.class,
129+
new MockLog.SeenEventExpectation(
130+
"warning",
131+
TransportHandshaker.class.getCanonicalName(),
132+
Level.WARN,
133+
"Rejecting unreadable transport handshake * incompatible wire format."
134+
)
135+
);
136+
}
137+
96138
public void testHandshakeResponseFromOlderNode() throws Exception {
97139
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
98140
final long reqId = randomNonNegativeLong();
@@ -108,6 +150,59 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
108150
assertEquals(remoteVersion, versionFuture.result());
109151
}
110152

153+
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
154+
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
155+
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
156+
final long reqId = randomNonNegativeLong();
157+
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
158+
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
159+
160+
assertFalse(versionFuture.isDone());
161+
162+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
163+
randomValueOtherThanMany(
164+
v -> TransportVersion.isKnownVersionId(v.id()),
165+
TransportHandshakerTests::getRandomIncompatibleTransportVersion
166+
),
167+
randomIdentifier()
168+
);
169+
170+
MockLog.assertThatLogger(
171+
() -> handler.handleResponse(handshakeResponse),
172+
TransportHandshaker.class,
173+
new MockLog.SeenEventExpectation(
174+
"warning",
175+
TransportHandshaker.class.getCanonicalName(),
176+
Level.WARN,
177+
"Rejecting unreadable transport handshake * incompatible wire format."
178+
)
179+
);
180+
181+
assertTrue(versionFuture.isDone());
182+
assertThat(
183+
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
184+
allOf(
185+
containsString("Rejecting unreadable transport handshake"),
186+
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
187+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
188+
containsString("which has an incompatible wire format")
189+
)
190+
);
191+
}
192+
193+
private static TransportVersion getRandomIncompatibleTransportVersion() {
194+
return new TransportVersion(
195+
randomBoolean()
196+
// either older than MINIMUM_COMPATIBLE
197+
? between(1, TransportVersions.MINIMUM_COMPATIBLE.id() - 1)
198+
// or between MINIMUM_COMPATIBLE and current but not known
199+
: randomValueOtherThanMany(
200+
TransportVersion::isKnownVersionId,
201+
() -> between(TransportVersions.MINIMUM_COMPATIBLE.id(), TransportVersion.current().id())
202+
)
203+
);
204+
}
205+
111206
public void testHandshakeResponseFromNewerNode() throws Exception {
112207
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
113208
final long reqId = randomNonNegativeLong();

0 commit comments

Comments
 (0)