Skip to content

Commit ba57820

Browse files
authored
[8.x] Move TransportVersion negotiation to handshake (#120261) (#120510)
* Move `TransportVersion` negotiation to handshake (#120261) Today the `TransportHandshaker` returns the remote node's actual `TransportVersion`, even if this is an unknown version from some future release. This is exposed by `Transport.Connection#getTransportVersion` despite that method's docs saying that the return value is the `TransportVersion` in use for the connection. The actual version negotiation doesn't happen until we get around to sending an outbound message in `OutboundHandler`. This doesn't matter much today since we only compare versions against known constants, ordering by ID, so all unknown future versions are treated equivalently to the current version. But still it's not correct, and we may need to make this more refined in future. * Fix
1 parent 2e12bbb commit ba57820

File tree

3 files changed

+62
-11
lines changed

3 files changed

+62
-11
lines changed

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.store.AlreadyClosedException;
1616
import org.apache.lucene.util.BytesRef;
1717
import org.elasticsearch.TransportVersion;
18+
import org.elasticsearch.TransportVersions;
1819
import org.elasticsearch.action.ActionListener;
1920
import org.elasticsearch.cluster.node.DiscoveryNode;
2021
import org.elasticsearch.common.bytes.BytesReference;
@@ -38,7 +39,9 @@ final class OutboundHandler {
3839
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
3940

4041
private final String nodeName;
42+
4143
private final TransportVersion version;
44+
4245
private final StatsTracker statsTracker;
4346
private final ThreadPool threadPool;
4447
private final Recycler<BytesRef> recycler;
@@ -98,11 +101,11 @@ void sendRequest(
98101
final Compression.Scheme compressionScheme,
99102
final boolean isHandshake
100103
) throws IOException, TransportException {
101-
TransportVersion version = TransportVersion.min(this.version, transportVersion);
102-
OutboundMessage.Request message = new OutboundMessage.Request(
104+
assert assertValidTransportVersion(transportVersion);
105+
final OutboundMessage.Request message = new OutboundMessage.Request(
103106
threadPool.getThreadContext(),
104107
request,
105-
version,
108+
transportVersion,
106109
action,
107110
requestId,
108111
isHandshake,
@@ -137,11 +140,11 @@ void sendResponse(
137140
final boolean isHandshake,
138141
final ResponseStatsConsumer responseStatsConsumer
139142
) {
140-
TransportVersion version = TransportVersion.min(this.version, transportVersion);
143+
assert assertValidTransportVersion(transportVersion);
141144
OutboundMessage.Response message = new OutboundMessage.Response(
142145
threadPool.getThreadContext(),
143146
response,
144-
version,
147+
transportVersion,
145148
requestId,
146149
isHandshake,
147150
compressionScheme
@@ -158,7 +161,11 @@ void sendResponse(
158161
} catch (Exception ex) {
159162
if (isHandshake) {
160163
logger.error(
161-
() -> format("Failed to send handshake response version [%s] received on [%s], closing channel", version, channel),
164+
() -> format(
165+
"Failed to send handshake response version [%s] received on [%s], closing channel",
166+
transportVersion,
167+
channel
168+
),
162169
ex
163170
);
164171
channel.close();
@@ -179,9 +186,15 @@ void sendErrorResponse(
179186
final ResponseStatsConsumer responseStatsConsumer,
180187
final Exception error
181188
) {
182-
TransportVersion version = TransportVersion.min(this.version, transportVersion);
183-
RemoteTransportException tx = new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error);
184-
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), tx, version, requestId, false, null);
189+
assert assertValidTransportVersion(transportVersion);
190+
OutboundMessage.Response message = new OutboundMessage.Response(
191+
threadPool.getThreadContext(),
192+
new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error),
193+
transportVersion,
194+
requestId,
195+
false,
196+
null
197+
);
185198
try {
186199
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, error));
187200
} catch (Exception sendException) {
@@ -297,4 +310,10 @@ public boolean rstOnClose() {
297310
return rstOnClose;
298311
}
299312

313+
private boolean assertValidTransportVersion(TransportVersion transportVersion) {
314+
assert this.version.before(TransportVersions.MINIMUM_COMPATIBLE) // running an incompatible-version test
315+
|| this.version.onOrAfter(transportVersion) : this.version + " vs " + transportVersion;
316+
return true;
317+
}
318+
300319
}

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public void handleResponse(HandshakeResponse response) {
257257
)
258258
);
259259
} else {
260-
listener.onResponse(responseVersion);
260+
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getResponseVersion()));
261261
}
262262
}
263263
}

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.core.TimeValue;
1919
import org.elasticsearch.tasks.TaskId;
2020
import org.elasticsearch.test.ESTestCase;
21+
import org.elasticsearch.test.TransportVersionUtils;
2122
import org.elasticsearch.threadpool.TestThreadPool;
2223

2324
import java.io.IOException;
@@ -51,7 +52,7 @@ public void setUp() throws Exception {
5152
.address("host", "host_address", buildNewFakeTransportAddress())
5253
.roles(Collections.emptySet())
5354
.build();
54-
threadPool = new TestThreadPool("thread-poll");
55+
threadPool = new TestThreadPool(getTestName());
5556
handshaker = new TransportHandshaker(TransportVersion.current(), threadPool, requestSender, false);
5657
}
5758

@@ -85,6 +86,37 @@ public void testHandshakeRequestAndResponse() throws IOException {
8586
assertEquals(TransportVersion.current(), versionFuture.actionGet());
8687
}
8788

89+
public void testHandshakeResponseFromOlderNode() throws Exception {
90+
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
91+
final long reqId = randomNonNegativeLong();
92+
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
93+
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
94+
95+
assertFalse(versionFuture.isDone());
96+
97+
final var remoteVersion = TransportVersionUtils.randomCompatibleVersion(random());
98+
handler.handleResponse(new TransportHandshaker.HandshakeResponse(remoteVersion));
99+
100+
assertTrue(versionFuture.isDone());
101+
assertEquals(remoteVersion, versionFuture.result());
102+
}
103+
104+
public void testHandshakeResponseFromNewerNode() throws Exception {
105+
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
106+
final long reqId = randomNonNegativeLong();
107+
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
108+
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
109+
110+
assertFalse(versionFuture.isDone());
111+
112+
handler.handleResponse(
113+
new TransportHandshaker.HandshakeResponse(TransportVersion.fromId(TransportVersion.current().id() + between(0, 10)))
114+
);
115+
116+
assertTrue(versionFuture.isDone());
117+
assertEquals(TransportVersion.current(), versionFuture.result());
118+
}
119+
88120
public void testHandshakeRequestFutureVersionsCompatibility() throws IOException {
89121
long reqId = randomLongBetween(1, 10);
90122
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), new PlainActionFuture<>());

0 commit comments

Comments
 (0)