diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 4138a6d4cebe5..f88961071b4c8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -893,15 +893,25 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set inSyncSet) * @return updated instance with incremented primary term */ public IndexMetadata withIncrementedPrimaryTerm(int shardId) { - final long[] incremented = this.primaryTerms.clone(); - incremented[shardId]++; + return withSetPrimaryTerm(shardId, this.primaryTerms[shardId] + 1); + } + + /** + * Creates a copy of this instance that has the primary term for the given shard id set to the value provided. + * @param shardId shard id to set primary term for + * @param primaryTerm primary term to set + * @return updated instance with set primary term + */ + public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) { + final long[] newPrimaryTerms = this.primaryTerms.clone(); + newPrimaryTerms[shardId] = primaryTerm; return new IndexMetadata( this.index, this.version, this.mappingVersion, this.settingsVersion, this.aliasesVersion, - incremented, + newPrimaryTerms, this.state, this.numberOfShards, this.numberOfReplicas, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java index 4d0c2bcb43912..3fd00dd7449bd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentFragment; @@ -19,6 +20,7 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Arrays; import java.util.Objects; /** @@ -205,6 +207,18 @@ public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int mul return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple)); } + public IndexReshardingMetadata transitionSplitTargetToHandoff(ShardId shardId) { + assert state instanceof IndexReshardingState.Split; + IndexReshardingState.Split splitState = (IndexReshardingState.Split) state; + IndexReshardingState.Split.TargetShardState[] newTargets = Arrays.copyOf( + splitState.targetShards(), + splitState.targetShards().length + ); + int i = shardId.getId() - state.shardCountBefore(); + newTargets[i] = IndexReshardingState.Split.TargetShardState.HANDOFF; + return new IndexReshardingMetadata(new IndexReshardingState.Split(splitState.sourceShards(), newTargets)); + } + /** * @return the split state of this metadata block, or throw IllegalArgumentException if this metadata doesn't represent a split */ @@ -215,6 +229,10 @@ public IndexReshardingState.Split getSplit() { }; } + public boolean isSplit() { + return state instanceof IndexReshardingState.Split; + } + /** * @return the number of shards the index has at the start of this operation */ diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java index ce71a4ed6a417..3dbadff3a1c58 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java @@ -250,6 +250,10 @@ TargetShardState[] targetShards() { return targetShards.clone(); } + public int sourceShard(int targetShard) { + return targetShard % shardCountBefore(); + } + /** * Create resharding metadata representing a new split operation * Split only supports updating an index to a multiple of its current shard count @@ -345,6 +349,10 @@ public SourceShardState getSourceShardState(int shardNum) { return sourceShards[shardNum]; } + public boolean isTargetShard(int shardId) { + return shardId >= shardCountBefore(); + } + /** * Get the current target state of a shard * @param shardNum an index into shards greater than or equal to the old shard count and less than the new shard count diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index 8f3915ce586f2..12cd6f7d451d7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.routing.GlobalRoutingTable; @@ -134,8 +135,17 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting shardId, updates ); + IndexReshardingMetadata reshardingMetadata = updatedIndexMetadata.getReshardingMetadata(); + boolean splitTarget = reshardingMetadata != null + && reshardingMetadata.isSplit() + && reshardingMetadata.getSplit().isTargetShard(shardId.id()); updatedIndexMetadata = updates.increaseTerm - ? updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id()) + ? splitTarget + ? updatedIndexMetadata.withSetPrimaryTerm( + shardId.id(), + splitPrimaryTerm(updatedIndexMetadata, reshardingMetadata, shardId) + ) + : updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id()) : updatedIndexMetadata; } if (updatedIndexMetadata != oldIndexMetadata) { @@ -147,6 +157,15 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting return updatedMetadata.build(); } + private static long splitPrimaryTerm(IndexMetadata updatedIndexMetadata, IndexReshardingMetadata reshardingMetadata, ShardId shardId) { + // We take the max of the source and target primary terms. This guarantees that the target primary term stays + // greater than or equal to the source. + return Math.max( + updatedIndexMetadata.primaryTerm(reshardingMetadata.getSplit().sourceShard(shardId.id())), + updatedIndexMetadata.primaryTerm(shardId.id()) + 1 + ); + } + /** * Updates in-sync allocations with routing changes that were made to the routing table. */