Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
*/
package org.elasticsearch.action.admin.cluster.allocation;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.ClusterBalanceStats;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStats;
Expand All @@ -38,9 +35,6 @@

public class DesiredBalanceResponse extends ActionResponse implements ChunkedToXContentObject {

private static final TransportVersion CLUSTER_BALANCE_STATS_VERSION = TransportVersions.V_8_7_0;
private static final TransportVersion CLUSTER_INFO_VERSION = TransportVersions.V_8_8_0;

private final DesiredBalanceStats stats;
private final ClusterBalanceStats clusterBalanceStats;
private final Map<String, Map<Integer, DesiredShards>> routingTable;
Expand All @@ -61,27 +55,21 @@ public DesiredBalanceResponse(
public static DesiredBalanceResponse from(StreamInput in) throws IOException {
return new DesiredBalanceResponse(
DesiredBalanceStats.readFrom(in),
in.getTransportVersion().onOrAfter(CLUSTER_BALANCE_STATS_VERSION)
? ClusterBalanceStats.readFrom(in)
: ClusterBalanceStats.EMPTY,
ClusterBalanceStats.readFrom(in),
in.readImmutableMap(v -> v.readImmutableMap(StreamInput::readVInt, DesiredShards::from)),
in.getTransportVersion().onOrAfter(CLUSTER_INFO_VERSION) ? new ClusterInfo(in) : ClusterInfo.EMPTY
new ClusterInfo(in)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);
if (out.getTransportVersion().onOrAfter(CLUSTER_BALANCE_STATS_VERSION)) {
out.writeWriteable(clusterBalanceStats);
}
out.writeWriteable(clusterBalanceStats);
out.writeMap(
routingTable,
(shardsOut, shards) -> shardsOut.writeMap(shards, StreamOutput::writeVInt, StreamOutput::writeWriteable)
);
if (out.getTransportVersion().onOrAfter(CLUSTER_INFO_VERSION)) {
out.writeWriteable(clusterInfo);
}
out.writeWriteable(clusterInfo);
}

@Override
Expand Down Expand Up @@ -192,10 +180,6 @@ public record ShardView(
List<String> tierPreference
) implements Writeable, ToXContentObject {

private static final TransportVersion ADD_FORECASTS_VERSION = TransportVersions.V_8_7_0;
private static final TransportVersion ADD_TIER_PREFERENCE = TransportVersions.V_8_8_0;
private static final TransportVersion NULLABLE_RELOCATING_NODE_IS_DESIRED = TransportVersions.V_8_8_0;

public ShardView {
assert (relocatingNode == null) == (relocatingNodeIsDesired == null)
: "relocatingNodeIsDesired should only be set when relocatingNode is set";
Expand All @@ -208,22 +192,12 @@ public static ShardView from(StreamInput in) throws IOException {
boolean nodeIsDesired = in.readBoolean();
String relocatingNode = in.readOptionalString();
Boolean relocatingNodeIsDesired;
if (in.getTransportVersion().onOrAfter(NULLABLE_RELOCATING_NODE_IS_DESIRED)) {
relocatingNodeIsDesired = in.readOptionalBoolean();
} else {
boolean wireRelocatingNodeIsDesired = in.readBoolean();
relocatingNodeIsDesired = relocatingNode == null ? null : wireRelocatingNodeIsDesired;
}
relocatingNodeIsDesired = in.readOptionalBoolean();
int shardId = in.readVInt();
String index = in.readString();
Double forecastWriteLoad = in.getTransportVersion().onOrAfter(ADD_FORECASTS_VERSION) ? in.readOptionalDouble() : null;
Long forecastShardSizeInBytes = in.getTransportVersion().onOrAfter(ADD_FORECASTS_VERSION) ? in.readOptionalLong() : null;
if (in.getTransportVersion().onOrAfter(ADD_FORECASTS_VERSION) == false) {
in.readOptionalWriteable(AllocationId::new);
}
List<String> tierPreference = in.getTransportVersion().onOrAfter(ADD_TIER_PREFERENCE)
? in.readStringCollectionAsList()
: List.of();
Double forecastWriteLoad = in.readOptionalDouble();
Long forecastShardSizeInBytes = in.readOptionalLong();
List<String> tierPreference = in.readStringCollectionAsList();
return new ShardView(
state,
primary,
Expand All @@ -246,22 +220,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(node);
out.writeBoolean(nodeIsDesired);
out.writeOptionalString(relocatingNode);
if (out.getTransportVersion().onOrAfter(NULLABLE_RELOCATING_NODE_IS_DESIRED)) {
out.writeOptionalBoolean(relocatingNodeIsDesired);
} else {
out.writeBoolean(relocatingNodeIsDesired != null && relocatingNodeIsDesired);
}
out.writeOptionalBoolean(relocatingNodeIsDesired);
out.writeVInt(shardId);
out.writeString(index);
if (out.getTransportVersion().onOrAfter(ADD_FORECASTS_VERSION)) {
out.writeOptionalDouble(forecastWriteLoad);
out.writeOptionalLong(forecastShardSizeInBytes);
} else {
out.writeMissingWriteable(AllocationId.class);
}
if (out.getTransportVersion().onOrAfter(ADD_TIER_PREFERENCE)) {
out.writeStringCollection(tierPreference);
}
out.writeOptionalDouble(forecastWriteLoad);
out.writeOptionalLong(forecastShardSizeInBytes);
out.writeStringCollection(tierPreference);
}

@Override
Expand Down
13 changes: 2 additions & 11 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -49,8 +48,6 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {

public static final ClusterInfo EMPTY = new ClusterInfo();

public static final TransportVersion DATA_PATH_NEW_KEY_VERSION = TransportVersions.V_8_6_0;

private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
final Map<String, Long> shardSizes;
Expand Down Expand Up @@ -105,9 +102,7 @@ public ClusterInfo(StreamInput in) throws IOException {
this.mostAvailableSpaceUsage = in.readImmutableMap(DiskUsage::new);
this.shardSizes = in.readImmutableMap(StreamInput::readLong);
this.shardDataSetSizes = in.readImmutableMap(ShardId::new, StreamInput::readLong);
this.dataPath = in.getTransportVersion().onOrAfter(DATA_PATH_NEW_KEY_VERSION)
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.dataPath = in.readImmutableMap(NodeAndShard::new, StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.estimatedHeapUsages = in.readImmutableMap(EstimatedHeapUsage::new);
Expand All @@ -132,11 +127,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.mostAvailableSpaceUsage, StreamOutput::writeWriteable);
out.writeMap(this.shardSizes, (o, v) -> o.writeLong(v == null ? -1 : v));
out.writeMap(this.shardDataSetSizes, StreamOutput::writeWriteable, StreamOutput::writeLong);
if (out.getTransportVersion().onOrAfter(DATA_PATH_NEW_KEY_VERSION)) {
out.writeMap(this.dataPath, StreamOutput::writeWriteable, StreamOutput::writeString);
} else {
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
}
out.writeMap(this.dataPath, StreamOutput::writeWriteable, StreamOutput::writeString);
out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,17 @@ public static ClusterBalanceStats createFrom(

public static ClusterBalanceStats readFrom(StreamInput in) throws IOException {
return new ClusterBalanceStats(
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVInt() : -1,
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVInt() : -1,
in.readVInt(),
in.readVInt(),
in.readImmutableMap(TierBalanceStats::readFrom),
in.readImmutableMap(NodeBalanceStats::readFrom)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeVInt(shards);
out.writeVInt(undesiredShardAllocations);
}
out.writeVInt(shards);
out.writeVInt(undesiredShardAllocations);
out.writeMap(tiers, StreamOutput::writeWriteable);
out.writeMap(nodes, StreamOutput::writeWriteable);
}
Expand Down Expand Up @@ -125,9 +123,7 @@ private static TierBalanceStats createFrom(List<NodeBalanceStats> nodes) {
public static TierBalanceStats readFrom(StreamInput in) throws IOException {
return new TierBalanceStats(
MetricStats.readFrom(in),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)
? MetricStats.readFrom(in)
: new MetricStats(0.0, 0.0, 0.0, 0.0, 0.0),
MetricStats.readFrom(in),
MetricStats.readFrom(in),
MetricStats.readFrom(in),
MetricStats.readFrom(in)
Expand All @@ -137,9 +133,7 @@ public static TierBalanceStats readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardCount.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
undesiredShardAllocations.writeTo(out);
}
undesiredShardAllocations.writeTo(out);
forecastWriteLoad.writeTo(out);
forecastShardSize.writeTo(out);
actualShardSize.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -37,8 +35,6 @@ public record DesiredBalanceStats(
long undesiredAllocations
) implements Writeable, ToXContentObject {

private static final TransportVersion COMPUTED_SHARD_MOVEMENTS_VERSION = TransportVersions.V_8_8_0;

public DesiredBalanceStats {
if (lastConvergedIndex < 0) {
assert false : lastConvergedIndex;
Expand All @@ -54,7 +50,7 @@ public static DesiredBalanceStats readFrom(StreamInput in) throws IOException {
in.readVLong(),
in.readVLong(),
in.readVLong(),
in.getTransportVersion().onOrAfter(COMPUTED_SHARD_MOVEMENTS_VERSION) ? in.readVLong() : -1,
in.readVLong(),
in.readVLong(),
in.readVLong(),
in.getTransportVersion().onOrAfter(V_8_12_0) ? in.readVLong() : -1,
Expand All @@ -71,16 +67,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(computationExecuted);
out.writeVLong(computationConverged);
out.writeVLong(computationIterations);
if (out.getTransportVersion().onOrAfter(COMPUTED_SHARD_MOVEMENTS_VERSION)) {
out.writeVLong(computedShardMovements);
}
out.writeVLong(computedShardMovements);
out.writeVLong(cumulativeComputationTime);
out.writeVLong(cumulativeReconciliationTime);
if (out.getTransportVersion().onOrAfter(V_8_12_0)) {
out.writeVLong(unassignedShards);
out.writeVLong(totalAllocations);
out.writeVLong(undesiredAllocations);
}
out.writeVLong(unassignedShards);
out.writeVLong(totalAllocations);
out.writeVLong(undesiredAllocations);
}

@Override
Expand Down