diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 57a24562344de..703e5c7af19ea 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -176,6 +176,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00); public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_840_0_01); public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_X = def(8_840_0_02); + public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_840_0_03); public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00); public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01); public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 45048c4a142f1..3106a7b5a43cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -466,7 +467,12 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T request.runNodeLevelReduction() ); // the sender doesn't support retry on shard failures, so we need to fail fast here. - final boolean failFastOnShardFailures = channel.getVersion().before(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE); + final boolean failFastOnShardFailures = supportShardLevelRetryFailure(channel.getVersion()) == false; runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, failFastOnShardFailures, listener); } + + static boolean supportShardLevelRetryFailure(TransportVersion transportVersion) { + return transportVersion.onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE) + || transportVersion.isPatchFrom(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java index 34a92fb135277..b1eb41ffc99d8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.plugin; -import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.operator.DriverProfile; @@ -32,7 +31,7 @@ final class DataNodeComputeResponse extends TransportResponse { } DataNodeComputeResponse(StreamInput in) throws IOException { - if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)) { + if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) { this.profiles = in.readCollectionAsImmutableList(DriverProfile::new); this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); } else { @@ -43,7 +42,7 @@ final class DataNodeComputeResponse extends TransportResponse { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)) { + if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) { out.writeCollection(profiles, (o, v) -> v.writeTo(o)); out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); } else {