From ddefea5450a00fc11d9bf218fa4d221f8d08025d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 24 Mar 2025 18:06:39 -0600 Subject: [PATCH 1/3] Changes --- .../cluster/metadata/IndexMetadata.java | 12 ++++++- .../metadata/IndexReshardingMetadata.java | 14 +++++++++ .../allocation/IndexMetadataUpdater.java | 31 ++++++++++++++----- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 4138a6d4cebe5..1b50384ad4585 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -893,8 +893,18 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set inSyncSet) * @return updated instance with incremented primary term */ public IndexMetadata withIncrementedPrimaryTerm(int shardId) { + return withSetPrimaryTerm(shardId, this.primaryTerms[shardId] + 1); + } + + /** + * Creates a copy of this instance that has the primary term for the given shard id set to the value provided. + * @param shardId shard id to set primary term for + * @param primaryTerm primary term to set + * @return updated instance with set primary term + */ + public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) { final long[] incremented = this.primaryTerms.clone(); - incremented[shardId]++; + incremented[shardId] = primaryTerm; return new IndexMetadata( this.index, this.version, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java index 4d0c2bcb43912..4e73f80a0e131 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentFragment; @@ -19,6 +20,7 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Arrays; import java.util.Objects; /** @@ -205,6 +207,18 @@ public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int mul return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple)); } + public IndexReshardingMetadata transitionSplitTargetToHandoff(ShardId shardId) { + assert state instanceof IndexReshardingState.Split; + IndexReshardingState.Split splitState = (IndexReshardingState.Split) state; + IndexReshardingState.Split.TargetShardState[] newTargets = Arrays.copyOf( + splitState.targetShards(), + splitState.targetShards().length + ); + int i = shardId.getId() - state.shardCountBefore(); + newTargets[i] = IndexReshardingState.Split.TargetShardState.HANDOFF; + return new IndexReshardingMetadata(new IndexReshardingState.Split(splitState.sourceShards(), newTargets)); + } + /** * @return the split state of this metadata block, or throw IllegalArgumentException if this metadata doesn't represent a split */ diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index 8f3915ce586f2..b44106a567ae1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -12,9 +12,11 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.routing.GlobalRoutingTable; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingChangesObserver; @@ -127,15 +129,19 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting for (Map.Entry shardEntry : indexChanges) { ShardId shardId = shardEntry.getKey(); Updates updates = shardEntry.getValue(); - updatedIndexMetadata = updateInSyncAllocations( - newRoutingTable.routingTable(projectMetadata.id()), - oldIndexMetadata, - updatedIndexMetadata, - shardId, - updates - ); + RoutingTable routingTable = newRoutingTable.routingTable(projectMetadata.id()); + updatedIndexMetadata = updateInSyncAllocations(routingTable, oldIndexMetadata, updatedIndexMetadata, shardId, updates); + IndexRoutingTable indexRoutingTable = routingTable.index(shardEntry.getKey().getIndex()); + RecoverySource recoverySource = indexRoutingTable.shard(shardEntry.getKey().id()).primaryShard().recoverySource(); + IndexReshardingMetadata reshardingMetadata = updatedIndexMetadata.getReshardingMetadata(); + boolean split = recoverySource != null && reshardingMetadata != null; updatedIndexMetadata = updates.increaseTerm - ? updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id()) + ? split + ? updatedIndexMetadata.withSetPrimaryTerm( + shardId.id(), + splitPrimaryTerm(updatedIndexMetadata, reshardingMetadata, shardId) + ) + : updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id()) : updatedIndexMetadata; } if (updatedIndexMetadata != oldIndexMetadata) { @@ -147,6 +153,15 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting return updatedMetadata.build(); } + private static long splitPrimaryTerm(IndexMetadata updatedIndexMetadata, IndexReshardingMetadata reshardingMetadata, ShardId shardId) { + // We take the max of the source and target primary terms. This guarantees that the target primary term stays + // greater than or equal to the source. + return Math.max( + updatedIndexMetadata.primaryTerm(shardId.getId() % reshardingMetadata.shardCountBefore()), + updatedIndexMetadata.primaryTerm(shardId.id()) + 1 + ); + } + /** * Updates in-sync allocations with routing changes that were made to the routing table. */ From e87a142daf14b5bea131279e818be8f36dab4edf Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 26 Mar 2025 16:24:34 -0600 Subject: [PATCH 2/3] Change --- .../metadata/IndexReshardingMetadata.java | 4 ++++ .../metadata/IndexReshardingState.java | 8 ++++++++ .../allocation/IndexMetadataUpdater.java | 20 +++++++++++-------- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java index 4e73f80a0e131..3fd00dd7449bd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java @@ -229,6 +229,10 @@ public IndexReshardingState.Split getSplit() { }; } + public boolean isSplit() { + return state instanceof IndexReshardingState.Split; + } + /** * @return the number of shards the index has at the start of this operation */ diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java index ce71a4ed6a417..3dbadff3a1c58 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java @@ -250,6 +250,10 @@ TargetShardState[] targetShards() { return targetShards.clone(); } + public int sourceShard(int targetShard) { + return targetShard % shardCountBefore(); + } + /** * Create resharding metadata representing a new split operation * Split only supports updating an index to a multiple of its current shard count @@ -345,6 +349,10 @@ public SourceShardState getSourceShardState(int shardNum) { return sourceShards[shardNum]; } + public boolean isTargetShard(int shardId) { + return shardId >= shardCountBefore(); + } + /** * Get the current target state of a shard * @param shardNum an index into shards greater than or equal to the old shard count and less than the new shard count diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index b44106a567ae1..12cd6f7d451d7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.routing.GlobalRoutingTable; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingChangesObserver; @@ -129,14 +128,19 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting for (Map.Entry shardEntry : indexChanges) { ShardId shardId = shardEntry.getKey(); Updates updates = shardEntry.getValue(); - RoutingTable routingTable = newRoutingTable.routingTable(projectMetadata.id()); - updatedIndexMetadata = updateInSyncAllocations(routingTable, oldIndexMetadata, updatedIndexMetadata, shardId, updates); - IndexRoutingTable indexRoutingTable = routingTable.index(shardEntry.getKey().getIndex()); - RecoverySource recoverySource = indexRoutingTable.shard(shardEntry.getKey().id()).primaryShard().recoverySource(); + updatedIndexMetadata = updateInSyncAllocations( + newRoutingTable.routingTable(projectMetadata.id()), + oldIndexMetadata, + updatedIndexMetadata, + shardId, + updates + ); IndexReshardingMetadata reshardingMetadata = updatedIndexMetadata.getReshardingMetadata(); - boolean split = recoverySource != null && reshardingMetadata != null; + boolean splitTarget = reshardingMetadata != null + && reshardingMetadata.isSplit() + && reshardingMetadata.getSplit().isTargetShard(shardId.id()); updatedIndexMetadata = updates.increaseTerm - ? split + ? splitTarget ? updatedIndexMetadata.withSetPrimaryTerm( shardId.id(), splitPrimaryTerm(updatedIndexMetadata, reshardingMetadata, shardId) @@ -157,7 +161,7 @@ private static long splitPrimaryTerm(IndexMetadata updatedIndexMetadata, IndexRe // We take the max of the source and target primary terms. This guarantees that the target primary term stays // greater than or equal to the source. return Math.max( - updatedIndexMetadata.primaryTerm(shardId.getId() % reshardingMetadata.shardCountBefore()), + updatedIndexMetadata.primaryTerm(reshardingMetadata.getSplit().sourceShard(shardId.id())), updatedIndexMetadata.primaryTerm(shardId.id()) + 1 ); } From 0d3d7241a87469f649339f1477fa4dfcbb5ecbb0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 27 Mar 2025 12:13:19 -0600 Subject: [PATCH 3/3] Renname --- .../org/elasticsearch/cluster/metadata/IndexMetadata.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 1b50384ad4585..f88961071b4c8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -903,15 +903,15 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) { * @return updated instance with set primary term */ public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) { - final long[] incremented = this.primaryTerms.clone(); - incremented[shardId] = primaryTerm; + final long[] newPrimaryTerms = this.primaryTerms.clone(); + newPrimaryTerms[shardId] = primaryTerm; return new IndexMetadata( this.index, this.version, this.mappingVersion, this.settingsVersion, this.aliasesVersion, - incremented, + newPrimaryTerms, this.state, this.numberOfShards, this.numberOfReplicas,