|
11 | 11 |
|
12 | 12 | import org.elasticsearch.Build; |
13 | 13 | import org.elasticsearch.TransportVersion; |
| 14 | +import org.elasticsearch.TransportVersions; |
14 | 15 | import org.elasticsearch.action.ActionListener; |
15 | 16 | import org.elasticsearch.cluster.node.DiscoveryNode; |
16 | 17 | import org.elasticsearch.common.bytes.BytesReference; |
@@ -246,26 +247,50 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea |
246 | 247 | assert ignoreDeserializationErrors : exception; |
247 | 248 | throw exception; |
248 | 249 | } |
249 | | - ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel); |
250 | | - channel.sendResponse(new HandshakeResponse(this.version, Build.current().version())); |
| 250 | + channel.sendResponse( |
| 251 | + new HandshakeResponse( |
| 252 | + ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel), |
| 253 | + Build.current().version() |
| 254 | + ) |
| 255 | + ); |
251 | 256 | } |
252 | 257 |
|
253 | | - static void ensureCompatibleVersion( |
| 258 | + private static TransportVersion ensureCompatibleVersion( |
254 | 259 | TransportVersion localTransportVersion, |
255 | 260 | TransportVersion remoteTransportVersion, |
256 | 261 | String releaseVersion, |
257 | 262 | Object channel |
258 | 263 | ) { |
259 | 264 | if (TransportVersion.isCompatible(remoteTransportVersion)) { |
260 | 265 | if (remoteTransportVersion.onOrAfter(localTransportVersion)) { |
261 | | - // Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it |
262 | | - // knows how to do that. |
263 | | - return; |
| 266 | + // Remote is semantically newer than us (i.e. has a greater transport protocol version), so we propose using our current |
| 267 | + // transport protocol version. If we're initiating the connection then that's the version we'll use; if the other end is |
| 268 | + // initiating the connection then it's up to the other end to decide whether to use this version (if it knows it) or |
| 269 | + // an earlier one. |
| 270 | + return localTransportVersion; |
264 | 271 | } |
265 | | - if (remoteTransportVersion.isKnown()) { |
266 | | - // Remote is older than us, so we will be using its transport protocol, which we can only do if and only if its protocol |
267 | | - // version is known to us. |
268 | | - return; |
| 272 | + final var bestKnownVersion = remoteTransportVersion.bestKnownVersion(); |
| 273 | + if (bestKnownVersion.equals(TransportVersions.ZERO) == false) { |
| 274 | + if (bestKnownVersion.equals(remoteTransportVersion) == false) { |
| 275 | + // Remote is semantically older than us (i.e. has a lower transport protocol version), but we do not know its exact |
| 276 | + // transport protocol version so it must be chronologically newer. We recommend not doing this, it implies an upgrade |
| 277 | + // that goes backwards in time and therefore may regress in some way, so we emit a warning. But we carry on with the |
| 278 | + // best known version anyway since both ends will know it. |
| 279 | + logger.warn( |
| 280 | + """ |
| 281 | + Negotiating transport handshake with remote node with version [{}/{}] received on [{}] which appears to be \ |
| 282 | + from a chronologically-older release with a numerically-newer version compared to this node's version [{}/{}]. \ |
| 283 | + Upgrading to a chronologically-older release may not work reliably and is not recommended. \ |
| 284 | + Falling back to transport protocol version [{}].""", |
| 285 | + releaseVersion, |
| 286 | + remoteTransportVersion, |
| 287 | + channel, |
| 288 | + Build.current().version(), |
| 289 | + localTransportVersion, |
| 290 | + bestKnownVersion |
| 291 | + ); |
| 292 | + } // else remote is semantically older and we _do_ know its version, so we just use that without further fuss. |
| 293 | + return bestKnownVersion; |
269 | 294 | } |
270 | 295 | } |
271 | 296 |
|
@@ -323,8 +348,12 @@ public Executor executor() { |
323 | 348 | public void handleResponse(HandshakeResponse response) { |
324 | 349 | if (isDone.compareAndSet(false, true)) { |
325 | 350 | ActionListener.completeWith(listener, () -> { |
326 | | - ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel); |
327 | | - final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion()); |
| 351 | + final var resultVersion = ensureCompatibleVersion( |
| 352 | + version, |
| 353 | + response.getTransportVersion(), |
| 354 | + response.getReleaseVersion(), |
| 355 | + channel |
| 356 | + ); |
328 | 357 | assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes |
329 | 358 | || resultVersion.isKnown() : "negotiated unknown version " + resultVersion; |
330 | 359 | return resultVersion; |
|
0 commit comments