Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_840_0_01);
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00);
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_001_0_01);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this one is a patch off of 9_001 but the others are patches from 9_000?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gah, this is a typo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);
public static final TransportVersion REMOVE_DESIRED_NODE_VERSION_90 = def(9_000_0_03);
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_90 = def(9_000_0_04);
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_90 = def(9_000_0_05);
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00);
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public GetSnapshotsResponse(List<SnapshotInfo> snapshots, @Nullable String next,

public GetSnapshotsResponse(StreamInput in) throws IOException {
this.snapshots = in.readCollectionAsImmutableList(SnapshotInfo::readFrom);
if (in.getTransportVersion().before(TransportVersions.REMOVE_SNAPSHOT_FAILURES)) {
if (in.getTransportVersion().before(TransportVersions.REMOVE_SNAPSHOT_FAILURES) &&
in.getTransportVersion().isPatchFrom(TransportVersions.REMOVE_SNAPSHOT_FAILURES_90) == false) {
// Deprecated `failures` field
in.readMap(StreamInput::readException);
}
Expand Down Expand Up @@ -83,7 +84,8 @@ public int remaining() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(snapshots);
if (out.getTransportVersion().before(TransportVersions.REMOVE_SNAPSHOT_FAILURES)) {
if (out.getTransportVersion().before(TransportVersions.REMOVE_SNAPSHOT_FAILURES) &&
out.getTransportVersion().isPatchFrom(TransportVersions.REMOVE_SNAPSHOT_FAILURES_90) == false) {
// Deprecated `failures` field
out.writeMap(Map.of(), StreamOutput::writeException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public static DesiredNode readFrom(StreamInput in) throws IOException {
}
final var memory = ByteSizeValue.readFrom(in);
final var storage = ByteSizeValue.readFrom(in);
if (in.getTransportVersion().before(TransportVersions.REMOVE_DESIRED_NODE_VERSION)) {
if (in.getTransportVersion().before(TransportVersions.REMOVE_DESIRED_NODE_VERSION) &&
in.getTransportVersion().isPatchFrom(TransportVersions.REMOVE_DESIRED_NODE_VERSION_90) == false) {
in.readOptionalString();
}
return new DesiredNode(settings, processors, processorsRange, memory, storage);
Expand All @@ -180,7 +181,8 @@ public void writeTo(StreamOutput out) throws IOException {
}
memory.writeTo(out);
storage.writeTo(out);
if (out.getTransportVersion().before(TransportVersions.REMOVE_DESIRED_NODE_VERSION)) {
if (out.getTransportVersion().before(TransportVersions.REMOVE_DESIRED_NODE_VERSION) &&
out.getTransportVersion().isPatchFrom(TransportVersions.REMOVE_DESIRED_NODE_VERSION_90) == false) {
out.writeOptionalString(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public TransportStats(StreamInput in) throws IOException {
rxSize = in.readVLong();
txCount = in.readVLong();
txSize = in.readVLong();
if (in.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED)) {
if (in.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED) &&
in.getTransportVersion().isPatchFrom(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90) == false) {
in.readBoolean();
}
inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
Expand Down Expand Up @@ -98,7 +99,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(txSize);
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
assert outboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
if (out.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED)) {
if (out.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED) &&
out.getTransportVersion().isPatchFrom(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90) == false) {
out.writeBoolean(true);
}
for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ public DriverProfile(
}

public DriverProfile(StreamInput in) throws IOException {
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ? in.readString() : "";
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ||
in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
this.taskDescription = in.readString();
} else {
this.taskDescription = "";
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
this.startMillis = in.readVLong();
this.stopMillis = in.readVLong();
Expand All @@ -112,7 +117,8 @@ public DriverProfile(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)) {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ||
out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
out.writeString(taskDescription);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ public class DriverStatus implements Task.Status {

public DriverStatus(StreamInput in) throws IOException {
this.sessionId = in.readString();
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ? in.readString() : "";
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ||
in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
this.taskDescription = in.readString();
} else {
this.taskDescription = "";
}
this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0;
this.lastUpdated = in.readLong();
this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
Expand All @@ -130,7 +135,8 @@ public DriverStatus(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(sessionId);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)) {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ||
out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
out.writeString(taskDescription);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
request.runNodeLevelReduction()
);
// the sender doesn't support retry on shard failures, so we need to fail fast here.
final boolean failFastOnShardFailures = channel.getVersion().before(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE);
final boolean failFastOnShardFailures = channel.getVersion().before(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE) &&
channel.getVersion().isPatchFrom(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_90) == false;
runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, failFastOnShardFailures, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ final class DataNodeComputeResponse extends TransportResponse {
}

DataNodeComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)) {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE) ||
in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_90)) {
this.profiles = in.readCollectionAsImmutableList(DriverProfile::new);
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
} else {
Expand All @@ -43,7 +44,8 @@ final class DataNodeComputeResponse extends TransportResponse {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE)) {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE) ||
out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_90)) {
out.writeCollection(profiles, (o, v) -> v.writeTo(o));
out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
} else {
Expand Down