diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 7e7a5e0fcd8b1..2b89ddc31aa70 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.LegacyActionRequest; @@ -40,6 +41,11 @@ public abstract class ReplicationRequest 4 shards. While splitting a bulk request, the coordinator observes - * that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together, - * sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request. - * Requests that are meant for shard 1 and 3 are bundled together, - * sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request. - * - * Example 2: - * Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes - * that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff. - * So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8, - * while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4. - * Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards - * are bundled together with and sent to their source shards. - * - * A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary - * will be treated as a Summary mismatch on the source shard node. + * The reshardSplitShardCountSummary has been added to support in-place resharding. + * See {@link SplitShardCountSummary} for details. */ protected final SplitShardCountSummary reshardSplitShardCountSummary; @@ -128,7 +101,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh if (thinRead) { this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } else { - this.reshardSplitShardCountSummary = new SplitShardCountSummary(in); + if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { + this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt()); + } else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt()); + } else { + this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; + } } } @@ -257,7 +236,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeTimeValue(timeout); out.writeString(index); out.writeVLong(routedBasedOnClusterVersion); - reshardSplitShardCountSummary.writeTo(out); + if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { + out.writeVInt(reshardSplitShardCountSummary.asInt()); + } else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + out.writeInt(reshardSplitShardCountSummary.asInt()); + } } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java index 1d68bba8f338f..70596cd3b926f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java @@ -9,15 +9,9 @@ package org.elasticsearch.cluster.routing; -import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; import org.elasticsearch.cluster.metadata.IndexReshardingState; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; /** * The SplitShardCountSummary has been added to accommodate in-place index resharding. @@ -57,12 +51,7 @@ * will be treated as a Summary mismatch on the source shard node. */ -public class SplitShardCountSummary implements Writeable { - // superseded - private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary"); - // bumped to use VInt instead of Int - private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small"); - +public class SplitShardCountSummary { public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0); /** @@ -137,16 +126,22 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary( return new SplitShardCountSummary(shardCount); } + /** + * Construct a SplitShardCountSummary from an integer + * Used for deserialization. + */ + public static SplitShardCountSummary fromInt(int payload) { + return new SplitShardCountSummary(payload); + } + private final int shardCountSummary; - public SplitShardCountSummary(StreamInput in) throws IOException { - if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { - this.shardCountSummary = in.readVInt(); - } else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { - this.shardCountSummary = in.readInt(); - } else { - this.shardCountSummary = UNSET.shardCountSummary; - } + /** + * Return an integer representation of this summary + * Used for serialization. + */ + public int asInt() { + return shardCountSummary; } /** @@ -161,15 +156,6 @@ public boolean isUnset() { this.shardCountSummary = shardCountSummary; } - @Override - public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { - out.writeVInt(shardCountSummary); - } else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { - out.writeInt(shardCountSummary); - } - } - @Override public boolean equals(Object other) { if (this == other) {