Skip to content

Commit ced440a

Browse files
authored
Adjust bwc for ES|QL retry on failures (#122685)
Relates ##120774
1 parent 5b95480 commit ced440a

File tree

3 files changed

+10
-4
lines changed

3 files changed

+10
-4
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ static TransportVersion def(int id) {
176176
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
177177
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_840_0_01);
178178
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_X = def(8_840_0_02);
179+
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_840_0_03);
179180
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00);
180181
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
181182
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.TransportVersions;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.ActionListenerResponseHandler;
@@ -466,7 +467,12 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
466467
request.runNodeLevelReduction()
467468
);
468469
// the sender doesn't support retry on shard failures, so we need to fail fast here.
469-
final boolean failFastOnShardFailures = channel.getVersion().before(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE);
470+
final boolean failFastOnShardFailures = supportShardLevelRetryFailure(channel.getVersion()) == false;
470471
runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, failFastOnShardFailures, listener);
471472
}
473+
474+
static boolean supportShardLevelRetryFailure(TransportVersion transportVersion) {
475+
return transportVersion.onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)
476+
|| transportVersion.isPatchFrom(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19);
477+
}
472478
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10-
import org.elasticsearch.TransportVersions;
1110
import org.elasticsearch.common.io.stream.StreamInput;
1211
import org.elasticsearch.common.io.stream.StreamOutput;
1312
import org.elasticsearch.compute.operator.DriverProfile;
@@ -32,7 +31,7 @@ final class DataNodeComputeResponse extends TransportResponse {
3231
}
3332

3433
DataNodeComputeResponse(StreamInput in) throws IOException {
35-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)) {
34+
if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) {
3635
this.profiles = in.readCollectionAsImmutableList(DriverProfile::new);
3736
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
3837
} else {
@@ -43,7 +42,7 @@ final class DataNodeComputeResponse extends TransportResponse {
4342

4443
@Override
4544
public void writeTo(StreamOutput out) throws IOException {
46-
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)) {
45+
if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) {
4746
out.writeCollection(profiles, (o, v) -> v.writeTo(o));
4847
out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
4948
} else {

0 commit comments

Comments
 (0)