From 7d3013eb49c5d61856bd42657dc0d559b37956aa Mon Sep 17 00:00:00 2001 From: Brendan Cully Date: Fri, 3 Oct 2025 16:25:28 -0700 Subject: [PATCH] Promote the reshard split shard count summary to a type The semantics of this field are subtle and unintuitive enough that I think having it be a bare int is a little dangerous. Using a full type allows us to centralize logic and documentation to make it safer to use. --- .../action/bulk/BulkOperation.java | 5 +- .../action/bulk/BulkShardRequest.java | 14 +- .../replication/ReplicatedWriteRequest.java | 3 +- .../replication/ReplicationRequest.java | 34 +--- .../TransportReplicationAction.java | 11 +- .../cluster/metadata/IndexMetadata.java | 65 ------ .../routing/SplitShardCountSummary.java | 189 ++++++++++++++++++ .../IndexReshardingMetadataTests.java | 88 -------- .../routing/SplitShardCountSummaryTests.java | 111 ++++++++++ 9 files changed, 331 insertions(+), 189 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 7220d4c8dd468..69492c67f6ba1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.IndexRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -403,9 +404,9 @@ private void executeBulkRequestsByShard( // Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest var indexMetadata = project.index(shardId.getIndexName()); - int reshardSplitShardCountSummary = 0; + SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; if (indexMetadata != null) { - reshardSplitShardCountSummary = indexMetadata.getReshardSplitShardCountSummaryForIndexing(shardId.getId()); + reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); } BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 20ab490a38abe..1ab570d7ffed6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.metadata.InferenceFieldMetadata; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.set.Sets; @@ -50,21 +51,26 @@ public BulkShardRequest(StreamInput in) throws IOException { } } - public BulkShardRequest(ShardId shardId, int reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { + public BulkShardRequest( + ShardId shardId, + SplitShardCountSummary reshardSplitShardCountSummary, + RefreshPolicy refreshPolicy, + BulkItemRequest[] items + ) { this(shardId, reshardSplitShardCountSummary, refreshPolicy, items, false); } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { - this(shardId, 0, refreshPolicy, items, false); + this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, false); } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) { - this(shardId, 0, refreshPolicy, items, isSimulated); + this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated); } public BulkShardRequest( ShardId shardId, - int reshardSplitShardCountSummary, + SplitShardCountSummary reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index 8a26adbe13b3a..38dc9a4515322 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; @@ -47,7 +48,7 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) { super(shardId); } - public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) { + public ReplicatedWriteRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { super(shardId, reshardSplitShardCountSummary); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 7cab8b6e19659..7e7a5e0fcd8b1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -9,7 +9,6 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.LegacyActionRequest; @@ -17,6 +16,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; @@ -38,11 +38,6 @@ public abstract class ReplicationRequest= 0 && shardId < getNumberOfShards() : "shardId is out of bounds"; - int shardCount = getNumberOfShards(); - if (reshardingMetadata != null) { - if (reshardingMetadata.getSplit().isTargetShard(shardId)) { - int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId); - // Requests cannot be routed to target shards until they are ready - assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state"; - shardCount = reshardingMetadata.getSplit().shardCountAfter(); - } else if (reshardingMetadata.getSplit().isSourceShard(shardId)) { - if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) { - shardCount = reshardingMetadata.getSplit().shardCountAfter(); - } else { - shardCount = reshardingMetadata.getSplit().shardCountBefore(); - } - } - } - return shardCount; - } - - /** - * This method is used in the context of the resharding feature. - * Given a {@code shardId}, this method returns the "effective" shard count - * as seen by this IndexMetadata, for indexing operations. - * - * See {@code getReshardSplitShardCountSummary} for more details. - * @param shardId Input shardId for which we want to calculate the effective shard count - */ - public int getReshardSplitShardCountSummaryForIndexing(int shardId) { - return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF)); - } - - /** - * This method is used in the context of the resharding feature. - * Given a {@code shardId}, this method returns the "effective" shard count - * as seen by this IndexMetadata, for search operations. - * - * See {@code getReshardSplitShardCount} for more details. - * @param shardId Input shardId for which we want to calculate the effective shard count - */ - public int getReshardSplitShardCountSummaryForSearch(int shardId) { - return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.SPLIT)); - } - public int getNumberOfReplicas() { return numberOfReplicas; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java new file mode 100644 index 0000000000000..1d68bba8f338f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * The SplitShardCountSummary has been added to accommodate in-place index resharding. + * This is populated when the coordinator is deciding which shards a request applies to. + * For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits + * an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest} + * based on its cluster state view of the number of shards that are ready for indexing. + * The purpose of this metadata is to reconcile the cluster state visible at the coordinating + * node with that visible at the source shard node. (w.r.t resharding). + * When an index is being split, there is a point in time when the newly created shard (target shard) + * takes over its portion of the document space from the original shard (source shard). + * Although the handoff is atomic at the original (source shard) and new shards (target shard), + * there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it. + * This field is used by the original shard (source shard) when it processes the request to detect whether + * the coordinator's view of the new shard's state when it created the request matches the shard's current state, + * or whether the request must be reprocessed taking into account the current shard states. + * + * Note that we are able to get away with a single number, instead of an array of target shard states, + * because we only allow splits in increments of 2x. + * + * Example 1: + * Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes + * that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together, + * sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request. + * Requests that are meant for shard 1 and 3 are bundled together, + * sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request. + * + * Example 2: + * Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes + * that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff. + * So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8, + * while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4. + * Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards + * are bundled together with and sent to their source shards. + * + * A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary + * will be treated as a Summary mismatch on the source shard node. + */ + +public class SplitShardCountSummary implements Writeable { + // superseded + private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary"); + // bumped to use VInt instead of Int + private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small"); + + public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0); + + /** + * Given {@code IndexMetadata} and a {@code shardId}, this method returns the "effective" shard count + * as seen by this IndexMetadata, for indexing operations. + * + * See {@code getReshardSplitShardCountSummary} for more details. + * @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count + * @param shardId Input shardId for which we want to calculate the effective shard count + */ + public static SplitShardCountSummary forIndexing(IndexMetadata indexMetadata, int shardId) { + return getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.HANDOFF); + } + + /** + * This method is used in the context of the resharding feature. + * Given a {@code shardId}, this method returns the "effective" shard count + * as seen by this IndexMetadata, for search operations. + * + * See {@code getReshardSplitShardCount} for more details. + * @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count + * @param shardId Input shardId for which we want to calculate the effective shard count + */ + public static SplitShardCountSummary forSearch(IndexMetadata indexMetadata, int shardId) { + return getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.SPLIT); + } + + /** + * This method is used in the context of the resharding feature. + * Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for + * an operation to be routed to target shards, + * this method returns the "effective" shard count as seen by this IndexMetadata. + * + * The reshardSplitShardCountSummary tells us whether the coordinator routed requests to the source shard or + * to both source and target shards. Requests are routed to both source and target shards + * once the target shards are ready for an operation. + * + * The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards + * undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to + * source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard + * to verify if the coordinator and source node's view of the resharding state have a mismatch. + * See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary} + * for a detailed description of how this value is used. + * + * @param shardId Input shardId for which we want to calculate the effective shard count + * @param minShardState Minimum target shard state required for the target to be considered ready + * @return Effective shard count as seen by an operation using this IndexMetadata + */ + private static SplitShardCountSummary getReshardSplitShardCountSummary( + IndexMetadata indexMetadata, + int shardId, + IndexReshardingState.Split.TargetShardState minShardState + ) { + int numberOfShards = indexMetadata.getNumberOfShards(); + IndexReshardingMetadata reshardingMetadata = indexMetadata.getReshardingMetadata(); + assert shardId >= 0 && shardId < numberOfShards : "shardId is out of bounds"; + int shardCount = numberOfShards; + if (reshardingMetadata != null) { + if (reshardingMetadata.getSplit().isTargetShard(shardId)) { + int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId); + // Requests cannot be routed to target shards until they are ready + assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state"; + shardCount = reshardingMetadata.getSplit().shardCountAfter(); + } else if (reshardingMetadata.getSplit().isSourceShard(shardId)) { + if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) { + shardCount = reshardingMetadata.getSplit().shardCountAfter(); + } else { + shardCount = reshardingMetadata.getSplit().shardCountBefore(); + } + } + } + return new SplitShardCountSummary(shardCount); + } + + private final int shardCountSummary; + + public SplitShardCountSummary(StreamInput in) throws IOException { + if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { + this.shardCountSummary = in.readVInt(); + } else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + this.shardCountSummary = in.readInt(); + } else { + this.shardCountSummary = UNSET.shardCountSummary; + } + } + + /** + * Returns whether this shard count summary is carrying an actual value or is UNSET + */ + public boolean isUnset() { + return this.shardCountSummary == UNSET.shardCountSummary; + } + + // visible for testing + SplitShardCountSummary(int shardCountSummary) { + this.shardCountSummary = shardCountSummary; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { + out.writeVInt(shardCountSummary); + } else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + out.writeInt(shardCountSummary); + } + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + SplitShardCountSummary otherSummary = (SplitShardCountSummary) other; + return this.shardCountSummary == otherSummary.shardCountSummary; + } + + @Override + public int hashCode() { + return Integer.hashCode(shardCountSummary); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java index d15e864f957ab..1c7e3c415db0a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java @@ -10,11 +10,6 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.index.IndexVersionUtils; - -import java.util.Locale; - -import static org.hamcrest.Matchers.equalTo; public class IndexReshardingMetadataTests extends ESTestCase { // test that we can drive a split through all valid state transitions in random order and terminate @@ -82,87 +77,4 @@ public void testSplit() { assertTrue(split.isTargetShard(i)); } } - - // Test that the ReshardSplitShardCount is calculated correctly w.r.t the current state of resharding-split operation - public void testReshardShardCountCalculation() { - final var numShards = 1; - final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - var settings = indexSettings(IndexVersionUtils.randomCompatibleVersion(random()), 1, 0).build(); - IndexMetadata indexMetadata = IndexMetadata.builder(indexName).settings(settings).build(); - IndexReshardingMetadata reshardingMetadata = indexMetadata.getReshardingMetadata(); - - assertNull(reshardingMetadata); - - // Create IndexMetadata with 2 shards. This is to build the right value for numRoutingShards for an index with 2 shards. - // There is no resharding yet. - final var numSourceShards = 2; - indexMetadata = IndexMetadata.builder(indexMetadata).reshardAddShards(numSourceShards).build(); - reshardingMetadata = indexMetadata.getReshardingMetadata(); - - assertNull(reshardingMetadata); - - // When there is no resharding going on, the ReshardSplitShardCount is same as number of shards in the index - for (int i = 0; i < numSourceShards; i++) { - assertThat(indexMetadata.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numSourceShards)); - assertThat(indexMetadata.getReshardSplitShardCountSummaryForSearch(i), equalTo(numSourceShards)); - } - - // Now reshard-split from 2 shards to 4 shards - final int multiple = 2; - var IndexMetadataAfterReshard = IndexMetadata.builder(indexMetadata) - .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(numSourceShards, multiple)) - .reshardAddShards(numSourceShards * multiple) - .build(); - - reshardingMetadata = IndexMetadataAfterReshard.getReshardingMetadata(); - - // starting state is as expected - assertEquals(numSourceShards, reshardingMetadata.shardCountBefore()); - assertEquals(numSourceShards * multiple, reshardingMetadata.shardCountAfter()); - final int numShardsAfterReshard = reshardingMetadata.shardCountAfter(); - - // All target shards in CLONE state - for (int i = 0; i < numSourceShards; i++) { - assertTrue(reshardingMetadata.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.CLONE)); - - assertThat(IndexMetadataAfterReshard.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numSourceShards)); - assertThat(IndexMetadataAfterReshard.getReshardSplitShardCountSummaryForSearch(i), equalTo(numSourceShards)); - } - - IndexReshardingState.Split.Builder builder = new IndexReshardingState.Split.Builder(reshardingMetadata.getSplit()); - for (int i = numSourceShards; i < numShardsAfterReshard; i++) { - builder.setTargetShardState(i, IndexReshardingState.Split.TargetShardState.HANDOFF); - } - var indexReshardingMetadataHandoff = new IndexReshardingMetadata(builder.build()); - var indexMetadataHandoff = IndexMetadata.builder(IndexMetadataAfterReshard) - .reshardingMetadata(indexReshardingMetadataHandoff) - .build(); - - // All target shards in HANDOFF state - for (int i = 0; i < numSourceShards; i++) { - assertTrue( - indexReshardingMetadataHandoff.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.HANDOFF) - ); - - assertThat(indexMetadataHandoff.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numShardsAfterReshard)); - assertThat(indexMetadataHandoff.getReshardSplitShardCountSummaryForSearch(i), equalTo(numSourceShards)); - } - - builder = new IndexReshardingState.Split.Builder(indexReshardingMetadataHandoff.getSplit()); - for (int i = numSourceShards; i < numShardsAfterReshard; i++) { - builder.setTargetShardState(i, IndexReshardingState.Split.TargetShardState.SPLIT); - } - var indexReshardingMetadataSplit = new IndexReshardingMetadata(builder.build()); - var indexMetadataSplit = IndexMetadata.builder(IndexMetadataAfterReshard).reshardingMetadata(indexReshardingMetadataSplit).build(); - - // All target shards in SPLIT state - for (int i = 0; i < numSourceShards; i++) { - assertTrue( - indexReshardingMetadataSplit.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.SPLIT) - ); - - assertThat(indexMetadataSplit.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numShardsAfterReshard)); - assertThat(indexMetadataSplit.getReshardSplitShardCountSummaryForSearch(i), equalTo(numShardsAfterReshard)); - } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java new file mode 100644 index 0000000000000..59d9b5008613d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java @@ -0,0 +1,111 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.index.IndexVersionUtils; + +import java.util.Locale; + +import static org.hamcrest.Matchers.equalTo; + +public class SplitShardCountSummaryTests extends ESTestCase { + // Test that the ReshardSplitShardCount is calculated correctly w.r.t the current state of resharding-split operation + public void testReshardShardCountCalculation() { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + var settings = indexSettings(IndexVersionUtils.randomCompatibleVersion(random()), 1, 0).build(); + IndexMetadata indexMetadata = IndexMetadata.builder(indexName).settings(settings).build(); + IndexReshardingMetadata reshardingMetadata = indexMetadata.getReshardingMetadata(); + + assertNull(reshardingMetadata); + + // Create IndexMetadata with 2 shards. This is to build the right value for numRoutingShards for an index with 2 shards. + // There is no resharding yet. + final var numSourceShards = 2; + indexMetadata = IndexMetadata.builder(indexMetadata).reshardAddShards(numSourceShards).build(); + reshardingMetadata = indexMetadata.getReshardingMetadata(); + + assertNull(reshardingMetadata); + + final var preSplitSummary = new SplitShardCountSummary(numSourceShards); + // When there is no resharding going on, the ReshardSplitShardCount is same as number of shards in the index + for (int i = 0; i < numSourceShards; i++) { + assertThat(SplitShardCountSummary.forIndexing(indexMetadata, i), equalTo(preSplitSummary)); + assertThat(SplitShardCountSummary.forSearch(indexMetadata, i), equalTo(preSplitSummary)); + } + + // Now reshard-split from 2 shards to 4 shards + final int multiple = 2; + var indexMetadataAfterReshard = IndexMetadata.builder(indexMetadata) + .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(numSourceShards, multiple)) + .reshardAddShards(numSourceShards * multiple) + .build(); + + reshardingMetadata = indexMetadataAfterReshard.getReshardingMetadata(); + + // starting state is as expected + assertEquals(numSourceShards, reshardingMetadata.shardCountBefore()); + assertEquals(numSourceShards * multiple, reshardingMetadata.shardCountAfter()); + final int numShardsAfterReshard = reshardingMetadata.shardCountAfter(); + final var postSplitSummary = new SplitShardCountSummary(numShardsAfterReshard); + + // All target shards in CLONE state + for (int i = 0; i < numSourceShards; i++) { + assertTrue(reshardingMetadata.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.CLONE)); + + assertThat(SplitShardCountSummary.forIndexing(indexMetadataAfterReshard, i), equalTo(preSplitSummary)); + assertThat(SplitShardCountSummary.forSearch(indexMetadataAfterReshard, i), equalTo(preSplitSummary)); + } + + var indexReshardingMetadataHandoff = reshardingMetadata; + for (int i = numSourceShards; i < numShardsAfterReshard; i++) { + indexReshardingMetadataHandoff = indexReshardingMetadataHandoff.transitionSplitTargetToNewState( + new ShardId("test", "na", i), + IndexReshardingState.Split.TargetShardState.HANDOFF + ); + } + var indexMetadataHandoff = IndexMetadata.builder(indexMetadataAfterReshard) + .reshardingMetadata(indexReshardingMetadataHandoff) + .build(); + + // All target shards in HANDOFF state + for (int i = 0; i < numSourceShards; i++) { + assertTrue( + indexReshardingMetadataHandoff.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.HANDOFF) + ); + + assertThat(SplitShardCountSummary.forIndexing(indexMetadataHandoff, i), equalTo(postSplitSummary)); + assertThat(SplitShardCountSummary.forSearch(indexMetadataHandoff, i), equalTo(preSplitSummary)); + } + + var indexReshardingMetadataSplit = indexReshardingMetadataHandoff; + for (int i = numSourceShards; i < numShardsAfterReshard; i++) { + indexReshardingMetadataSplit = indexReshardingMetadataSplit.transitionSplitTargetToNewState( + new ShardId("test", "na", i), + IndexReshardingState.Split.TargetShardState.SPLIT + ); + } + var indexMetadataSplit = IndexMetadata.builder(indexMetadataAfterReshard).reshardingMetadata(indexReshardingMetadataSplit).build(); + + // All target shards in SPLIT state + for (int i = 0; i < numSourceShards; i++) { + assertTrue( + indexReshardingMetadataSplit.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.SPLIT) + ); + + assertThat(SplitShardCountSummary.forIndexing(indexMetadataSplit, i), equalTo(postSplitSummary)); + assertThat(SplitShardCountSummary.forSearch(indexMetadataSplit, i), equalTo(postSplitSummary)); + } + } +}