Skip to content

Commit 6bd583a

Browse files
authored
[9.0] Validate transport handshake from known version (elastic#121747) (elastic#121801)
* Validate transport handshake from known version (elastic#121747) With parallel releases on multiple branches it's possible that an older branch sees a transport version update that is not known to a numerically newer but chronologically older version. In that case the two nodes cannot intercommunicate, so with this commit we reject such connection attempts at the version negotiation stage. * Fix version/transportversion confusion * CI poke
1 parent e0dac2e commit 6bd583a

File tree

3 files changed

+151
-19
lines changed

3 files changed

+151
-19
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ public static List<TransportVersion> getAllVersions() {
118118
return VersionsHolder.ALL_VERSIONS;
119119
}
120120

121+
/**
122+
* @return whether this is a known {@link TransportVersion}, i.e. one declared in {@link TransportVersions}. Other versions may exist
123+
* in the wild (they're sent over the wire by numeric ID) but we don't know how to communicate using such versions.
124+
*/
125+
public boolean isKnown() {
126+
return VersionsHolder.ALL_VERSIONS_MAP.containsKey(id);
127+
}
128+
121129
public static TransportVersion fromString(String str) {
122130
return TransportVersion.fromId(Integer.parseInt(str));
123131
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@
1111

1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.TransportVersion;
14-
import org.elasticsearch.TransportVersions;
1514
import org.elasticsearch.action.ActionListener;
1615
import org.elasticsearch.cluster.node.DiscoveryNode;
1716
import org.elasticsearch.common.bytes.BytesReference;
1817
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1918
import org.elasticsearch.common.io.stream.StreamInput;
2019
import org.elasticsearch.common.io.stream.StreamOutput;
2120
import org.elasticsearch.common.metrics.CounterMetric;
21+
import org.elasticsearch.core.Strings;
2222
import org.elasticsearch.core.TimeValue;
2323
import org.elasticsearch.core.UpdateForV9;
24+
import org.elasticsearch.logging.LogManager;
25+
import org.elasticsearch.logging.Logger;
2426
import org.elasticsearch.threadpool.ThreadPool;
2527

2628
import java.io.EOFException;
@@ -126,6 +128,8 @@ final class TransportHandshaker {
126128
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
127129
*/
128130

131+
private static final Logger logger = LogManager.getLogger(TransportHandshaker.class);
132+
129133
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
130134
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
131135
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(V8_HANDSHAKE_VERSION, V9_HANDSHAKE_VERSION);
@@ -159,7 +163,7 @@ void sendHandshake(
159163
ActionListener<TransportVersion> listener
160164
) {
161165
numHandshakes.inc();
162-
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, listener);
166+
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, channel, listener);
163167
pendingHandshakes.put(requestId, handler);
164168
channel.addCloseListener(
165169
ActionListener.running(() -> handler.handleLocalException(new TransportException("handshake failed because connection reset")))
@@ -185,9 +189,9 @@ void sendHandshake(
185189
}
186190

187191
void handleHandshake(TransportChannel channel, long requestId, StreamInput stream) throws IOException {
192+
final HandshakeRequest handshakeRequest;
188193
try {
189-
// Must read the handshake request to exhaust the stream
190-
new HandshakeRequest(stream);
194+
handshakeRequest = new HandshakeRequest(stream);
191195
} catch (Exception e) {
192196
assert ignoreDeserializationErrors : e;
193197
throw e;
@@ -206,9 +210,44 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
206210
assert ignoreDeserializationErrors : exception;
207211
throw exception;
208212
}
213+
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
209214
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
210215
}
211216

217+
static void ensureCompatibleVersion(
218+
TransportVersion localTransportVersion,
219+
TransportVersion remoteTransportVersion,
220+
String releaseVersion,
221+
Object channel
222+
) {
223+
if (TransportVersion.isCompatible(remoteTransportVersion)) {
224+
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
225+
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
226+
// knows how to do that.
227+
return;
228+
}
229+
if (remoteTransportVersion.isKnown()) {
230+
// Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol
231+
// version is known to us.
232+
return;
233+
}
234+
}
235+
236+
final var message = Strings.format(
237+
"""
238+
Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
239+
version [%s/%s] which has an incompatible wire format.""",
240+
releaseVersion,
241+
remoteTransportVersion,
242+
channel,
243+
Build.current().version(),
244+
localTransportVersion
245+
);
246+
logger.warn(message);
247+
throw new IllegalStateException(message);
248+
249+
}
250+
212251
TransportResponseHandler<HandshakeResponse> removeHandlerForHandshake(long requestId) {
213252
return pendingHandshakes.remove(requestId);
214253
}
@@ -224,11 +263,13 @@ long getNumHandshakes() {
224263
private class HandshakeResponseHandler implements TransportResponseHandler<HandshakeResponse> {
225264

226265
private final long requestId;
266+
private final TcpChannel channel;
227267
private final ActionListener<TransportVersion> listener;
228268
private final AtomicBoolean isDone = new AtomicBoolean(false);
229269

230-
private HandshakeResponseHandler(long requestId, ActionListener<TransportVersion> listener) {
270+
private HandshakeResponseHandler(long requestId, TcpChannel channel, ActionListener<TransportVersion> listener) {
231271
this.requestId = requestId;
272+
this.channel = channel;
232273
this.listener = listener;
233274
}
234275

@@ -245,20 +286,13 @@ public Executor executor() {
245286
@Override
246287
public void handleResponse(HandshakeResponse response) {
247288
if (isDone.compareAndSet(false, true)) {
248-
TransportVersion responseVersion = response.transportVersion;
249-
if (TransportVersion.isCompatible(responseVersion) == false) {
250-
listener.onFailure(
251-
new IllegalStateException(
252-
"Received message from unsupported version: ["
253-
+ responseVersion
254-
+ "] minimal compatible version is: ["
255-
+ TransportVersions.MINIMUM_COMPATIBLE
256-
+ "]"
257-
)
258-
);
259-
} else {
260-
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()));
261-
}
289+
ActionListener.completeWith(listener, () -> {
290+
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
291+
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
292+
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
293+
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
294+
return resultVersion;
295+
});
262296
}
263297
}
264298

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
*/
99
package org.elasticsearch.transport;
1010

11+
import org.apache.logging.log4j.Level;
12+
import org.elasticsearch.Build;
1113
import org.elasticsearch.TransportVersion;
14+
import org.elasticsearch.TransportVersions;
15+
import org.elasticsearch.action.ActionListener;
1216
import org.elasticsearch.action.support.PlainActionFuture;
1317
import org.elasticsearch.cluster.node.DiscoveryNode;
1418
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -18,13 +22,17 @@
1822
import org.elasticsearch.core.UpdateForV10;
1923
import org.elasticsearch.tasks.TaskId;
2024
import org.elasticsearch.test.ESTestCase;
25+
import org.elasticsearch.test.MockLog;
2126
import org.elasticsearch.test.TransportVersionUtils;
27+
import org.elasticsearch.test.junit.annotations.TestLogging;
2228
import org.elasticsearch.threadpool.TestThreadPool;
2329

2430
import java.io.IOException;
2531
import java.util.Collections;
32+
import java.util.concurrent.ExecutionException;
2633
import java.util.concurrent.TimeUnit;
2734

35+
import static org.hamcrest.Matchers.allOf;
2836
import static org.hamcrest.Matchers.containsString;
2937
import static org.mockito.Mockito.doThrow;
3038
import static org.mockito.Mockito.mock;
@@ -92,6 +100,40 @@ public void testHandshakeRequestAndResponse() throws IOException {
92100
assertEquals(TransportVersion.current(), versionFuture.actionGet());
93101
}
94102

103+
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
104+
public void testIncompatibleHandshakeRequest() throws IOException {
105+
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
106+
getRandomIncompatibleTransportVersion(),
107+
randomIdentifier()
108+
);
109+
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
110+
bytesStreamOutput.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
111+
handshakeRequest.writeTo(bytesStreamOutput);
112+
StreamInput input = bytesStreamOutput.bytes().streamInput();
113+
input.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
114+
final TestTransportChannel channel = new TestTransportChannel(ActionListener.running(() -> fail("should not complete")));
115+
116+
MockLog.assertThatLogger(
117+
() -> assertThat(
118+
expectThrows(IllegalStateException.class, () -> handshaker.handleHandshake(channel, randomNonNegativeLong(), input))
119+
.getMessage(),
120+
allOf(
121+
containsString("Rejecting unreadable transport handshake"),
122+
containsString("[" + handshakeRequest.releaseVersion + "/" + handshakeRequest.transportVersion + "]"),
123+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
124+
containsString("which has an incompatible wire format")
125+
)
126+
),
127+
TransportHandshaker.class,
128+
new MockLog.SeenEventExpectation(
129+
"warning",
130+
TransportHandshaker.class.getCanonicalName(),
131+
Level.WARN,
132+
"Rejecting unreadable transport handshake * incompatible wire format."
133+
)
134+
);
135+
}
136+
95137
public void testHandshakeResponseFromOlderNode() throws Exception {
96138
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
97139
final long reqId = randomNonNegativeLong();
@@ -107,6 +149,54 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
107149
assertEquals(remoteVersion, versionFuture.result());
108150
}
109151

152+
@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
153+
public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
154+
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
155+
final long reqId = randomNonNegativeLong();
156+
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
157+
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
158+
159+
assertFalse(versionFuture.isDone());
160+
161+
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
162+
getRandomIncompatibleTransportVersion(),
163+
randomIdentifier()
164+
);
165+
166+
MockLog.assertThatLogger(
167+
() -> handler.handleResponse(handshakeResponse),
168+
TransportHandshaker.class,
169+
new MockLog.SeenEventExpectation(
170+
"warning",
171+
TransportHandshaker.class.getCanonicalName(),
172+
Level.WARN,
173+
"Rejecting unreadable transport handshake * incompatible wire format."
174+
)
175+
);
176+
177+
assertTrue(versionFuture.isDone());
178+
assertThat(
179+
expectThrows(ExecutionException.class, IllegalStateException.class, versionFuture::result).getMessage(),
180+
allOf(
181+
containsString("Rejecting unreadable transport handshake"),
182+
containsString("[" + handshakeResponse.getReleaseVersion() + "/" + handshakeResponse.getTransportVersion() + "]"),
183+
containsString("[" + Build.current().version() + "/" + TransportVersion.current() + "]"),
184+
containsString("which has an incompatible wire format")
185+
)
186+
);
187+
}
188+
189+
private static TransportVersion getRandomIncompatibleTransportVersion() {
190+
return randomBoolean()
191+
// either older than MINIMUM_COMPATIBLE
192+
? new TransportVersion(between(1, TransportVersions.MINIMUM_COMPATIBLE.id() - 1))
193+
// or between MINIMUM_COMPATIBLE and current but not known
194+
: randomValueOtherThanMany(
195+
TransportVersion::isKnown,
196+
() -> new TransportVersion(between(TransportVersions.MINIMUM_COMPATIBLE.id(), TransportVersion.current().id()))
197+
);
198+
}
199+
110200
public void testHandshakeResponseFromNewerNode() throws Exception {
111201
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
112202
final long reqId = randomNonNegativeLong();

0 commit comments

Comments
 (0)