diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index a9f0146ab03d3..7220d4c8dd468 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -401,13 +401,20 @@ private void executeBulkRequestsByShard( final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); + // Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest + var indexMetadata = project.index(shardId.getIndexName()); + int reshardSplitShardCountSummary = 0; + if (indexMetadata != null) { + reshardSplitShardCountSummary = indexMetadata.getReshardSplitShardCountSummaryForIndexing(shardId.getId()); + } BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, + reshardSplitShardCountSummary, bulkRequest.getRefreshPolicy(), requests.toArray(new BulkItemRequest[0]), bulkRequest.isSimulated() ); - var indexMetadata = project.index(shardId.getIndexName()); + if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) { bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields()); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 750c56461135d..20ab490a38abe 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -50,12 +50,26 @@ public BulkShardRequest(StreamInput in) throws IOException { } } + public BulkShardRequest(ShardId shardId, int reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { + this(shardId, reshardSplitShardCountSummary, refreshPolicy, items, false); + } + public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { - this(shardId, refreshPolicy, items, false); + this(shardId, 0, refreshPolicy, items, false); } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) { - super(shardId); + this(shardId, 0, refreshPolicy, items, isSimulated); + } + + public BulkShardRequest( + ShardId shardId, + int reshardSplitShardCountSummary, + RefreshPolicy refreshPolicy, + BulkItemRequest[] items, + boolean isSimulated + ) { + super(shardId, reshardSplitShardCountSummary); this.items = items; setRefreshPolicy(refreshPolicy); this.isSimulated = isSimulated; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index 94f4275614dae..8a26adbe13b3a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -47,6 +47,10 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) { super(shardId); } + public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) { + super(shardId, reshardSplitShardCountSummary); + } + @Override @SuppressWarnings("unchecked") public R setRefreshPolicy(RefreshPolicy refreshPolicy) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index b64f9e4081247..f5cd42fd4d524 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.LegacyActionRequest; @@ -37,6 +38,8 @@ public abstract class ReplicationRequest 4 shards. While splitting a bulk request, the coordinator observes + * that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together, + * sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request. + * Requests that are meant for shard 1 and 3 are bundled together, + * sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request. + * + * Example 2: + * Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes + * that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff. + * So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8, + * while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4. + * Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards + * are bundled together with and sent to their source shards. + * + * A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary + * will be treated as a Summary mismatch on the source shard node. + */ + protected final int reshardSplitShardCountSummary; + /** * The number of shard copies that must be active before proceeding with the replication action. */ @@ -61,6 +103,10 @@ public ReplicationRequest(StreamInput in) throws IOException { } public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { + this(shardId, 0, in); + } + + public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary, StreamInput in) throws IOException { super(in); final boolean thinRead = shardId != null; if (thinRead) { @@ -80,15 +126,32 @@ public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOEx index = in.readString(); } routedBasedOnClusterVersion = in.readVLong(); + if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + if (thinRead) { + this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; + } else { + this.reshardSplitShardCountSummary = in.readInt(); + } + } else { + this.reshardSplitShardCountSummary = 0; + } } /** * Creates a new request with resolved shard id */ public ReplicationRequest(@Nullable ShardId shardId) { + this(shardId, 0); + } + + /** + * Creates a new request with resolved shard id and reshardSplitShardCountSummary + */ + public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) { this.index = shardId == null ? null : shardId.getIndexName(); this.shardId = shardId; this.timeout = DEFAULT_TIMEOUT; + this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } /** @@ -137,6 +200,14 @@ public ShardId shardId() { return shardId; } + /** + * @return The effective shard count as seen by the coordinator when creating this request. + * can be 0 if this has not yet been resolved. + */ + public int reshardSplitShardCountSummary() { + return reshardSplitShardCountSummary; + } + /** * Sets the number of shard copies that must be active before proceeding with the replication * operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy @@ -191,11 +262,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeTimeValue(timeout); out.writeString(index); out.writeVLong(routedBasedOnClusterVersion); + if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + out.writeInt(reshardSplitShardCountSummary); + } } /** * Thin serialization that does not write {@link #shardId} and will only write {@link #index} if it is different from the index name in - * {@link #shardId}. + * {@link #shardId}. Since we do not write {@link #shardId}, we also do not write {@link #reshardSplitShardCountSummary}. */ public void writeThin(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 27d5327711634..5278bb8a049ef 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -459,7 +459,9 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere try { final ClusterState clusterState = clusterService.state(); final Index index = primaryShardReference.routingEntry().index(); - final ProjectId projectId = clusterState.metadata().projectFor(index).id(); + final ProjectMetadata project = clusterState.metadata().projectFor(index); + final ProjectId projectId = project.id(); + final IndexMetadata indexMetadata = project.index(index); final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName()); if (blockException != null) { @@ -467,6 +469,11 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere throw blockException; } + int reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary(); + assert (reshardSplitShardCountSummary == 0 + || reshardSplitShardCountSummary == indexMetadata.getReshardSplitShardCountSummaryForIndexing( + primaryRequest.getRequest().shardId().getId() + )); if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible setPhase(replicationTask, "primary_delegation"); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 5c082153e1712..25edec3e855e4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -1181,6 +1181,71 @@ public int getNumberOfShards() { return numberOfShards; } + /** + * This method is used in the context of the resharding feature. + * Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for + * an operation to be routed to target shards, + * this method returns the "effective" shard count as seen by this IndexMetadata. + * + * The reshardSplitShardCountSummary tells us whether the coordinator routed requests to the source shard or + * to both source and target shards. Requests are routed to both source and target shards + * once the target shards are ready for an operation. + * + * The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards + * undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to + * source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard + * to verify if the coordinator and source node's view of the resharding state have a mismatch. + * See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary} + * for a detailed description of how this value is used. + * + * @param shardId Input shardId for which we want to calculate the effective shard count + * @param minShardState Minimum target shard state required for the target to be considered ready + * @return Effective shard count as seen by an operation using this IndexMetadata + */ + private int getReshardSplitShardCountSummary(int shardId, IndexReshardingState.Split.TargetShardState minShardState) { + assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds"; + int shardCount = getNumberOfShards(); + if (reshardingMetadata != null) { + if (reshardingMetadata.getSplit().isTargetShard(shardId)) { + int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId); + // Requests cannot be routed to target shards until they are ready + assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state"; + shardCount = reshardingMetadata.getSplit().shardCountAfter(); + } else if (reshardingMetadata.getSplit().isSourceShard(shardId)) { + if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) { + shardCount = reshardingMetadata.getSplit().shardCountAfter(); + } else { + shardCount = reshardingMetadata.getSplit().shardCountBefore(); + } + } + } + return shardCount; + } + + /** + * This method is used in the context of the resharding feature. + * Given a {@code shardId}, this method returns the "effective" shard count + * as seen by this IndexMetadata, for indexing operations. + * + * See {@code getReshardSplitShardCountSummary} for more details. + * @param shardId Input shardId for which we want to calculate the effective shard count + */ + public int getReshardSplitShardCountSummaryForIndexing(int shardId) { + return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF)); + } + + /** + * This method is used in the context of the resharding feature. + * Given a {@code shardId}, this method returns the "effective" shard count + * as seen by this IndexMetadata, for search operations. + * + * See {@code getReshardSplitShardCount} for more details. + * @param shardId Input shardId for which we want to calculate the effective shard count + */ + public int getReshardSplitShardCountSummaryForSearch(int shardId) { + return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.SPLIT)); + } + public int getNumberOfReplicas() { return numberOfReplicas; } @@ -3022,7 +3087,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException { * Returns the number of shards that should be used for routing. This basically defines the hash space we use in * {@link IndexRouting#indexShard} to route documents * to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only - * changes if and index is shrunk. + * changes if an index is shrunk. */ public int getRoutingNumShards() { return routingNumShards; @@ -3042,7 +3107,7 @@ public int getRoutingFactor() { * @param shardId the id of the target shard to split into * @param sourceIndexMetadata the source index metadata * @param numTargetShards the total number of shards in the target index - * @return a the source shard ID to split off from + * @return the source shard ID to split off from */ public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) { int numSourceShards = sourceIndexMetadata.getNumberOfShards(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java index 434d339509112..e1e586f153c4b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java @@ -378,6 +378,16 @@ public boolean targetStateAtLeast(int shardNum, TargetShardState targetShardStat return getTargetShardState(shardNum).ordinal() >= targetShardState.ordinal(); } + public boolean allTargetStatesAtLeast(int sourceShardId, TargetShardState targetShardState) { + var targets = getTargetStatesFor(sourceShardId); + for (TargetShardState state : targets) { + if (state.ordinal() < targetShardState.ordinal()) { + return false; + } + } + return true; + } + public Stream targetStates() { return Arrays.stream(targetShards); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index b7d69cc9ff54c..9a79349f697c6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -72,6 +72,7 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) { protected final String indexName; private final int routingNumShards; private final int routingFactor; + @Nullable private final IndexReshardingMetadata indexReshardingMetadata; private IndexRouting(IndexMetadata metadata) { diff --git a/server/src/main/resources/transport/definitions/referable/index_reshard_shardcount_summary.csv b/server/src/main/resources/transport/definitions/referable/index_reshard_shardcount_summary.csv new file mode 100644 index 0000000000000..42d1df167e3b9 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/index_reshard_shardcount_summary.csv @@ -0,0 +1 @@ +9172000 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index f69c2d7fac9a6..e4c91df18cda8 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -esql_fuse_linear_operator_status,9171000 +index_reshard_shardcount_summary,9172000 diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java index 1c7e3c415db0a..d15e864f957ab 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java @@ -10,6 +10,11 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.index.IndexVersionUtils; + +import java.util.Locale; + +import static org.hamcrest.Matchers.equalTo; public class IndexReshardingMetadataTests extends ESTestCase { // test that we can drive a split through all valid state transitions in random order and terminate @@ -77,4 +82,87 @@ public void testSplit() { assertTrue(split.isTargetShard(i)); } } + + // Test that the ReshardSplitShardCount is calculated correctly w.r.t the current state of resharding-split operation + public void testReshardShardCountCalculation() { + final var numShards = 1; + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + var settings = indexSettings(IndexVersionUtils.randomCompatibleVersion(random()), 1, 0).build(); + IndexMetadata indexMetadata = IndexMetadata.builder(indexName).settings(settings).build(); + IndexReshardingMetadata reshardingMetadata = indexMetadata.getReshardingMetadata(); + + assertNull(reshardingMetadata); + + // Create IndexMetadata with 2 shards. This is to build the right value for numRoutingShards for an index with 2 shards. + // There is no resharding yet. + final var numSourceShards = 2; + indexMetadata = IndexMetadata.builder(indexMetadata).reshardAddShards(numSourceShards).build(); + reshardingMetadata = indexMetadata.getReshardingMetadata(); + + assertNull(reshardingMetadata); + + // When there is no resharding going on, the ReshardSplitShardCount is same as number of shards in the index + for (int i = 0; i < numSourceShards; i++) { + assertThat(indexMetadata.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numSourceShards)); + assertThat(indexMetadata.getReshardSplitShardCountSummaryForSearch(i), equalTo(numSourceShards)); + } + + // Now reshard-split from 2 shards to 4 shards + final int multiple = 2; + var IndexMetadataAfterReshard = IndexMetadata.builder(indexMetadata) + .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(numSourceShards, multiple)) + .reshardAddShards(numSourceShards * multiple) + .build(); + + reshardingMetadata = IndexMetadataAfterReshard.getReshardingMetadata(); + + // starting state is as expected + assertEquals(numSourceShards, reshardingMetadata.shardCountBefore()); + assertEquals(numSourceShards * multiple, reshardingMetadata.shardCountAfter()); + final int numShardsAfterReshard = reshardingMetadata.shardCountAfter(); + + // All target shards in CLONE state + for (int i = 0; i < numSourceShards; i++) { + assertTrue(reshardingMetadata.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.CLONE)); + + assertThat(IndexMetadataAfterReshard.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numSourceShards)); + assertThat(IndexMetadataAfterReshard.getReshardSplitShardCountSummaryForSearch(i), equalTo(numSourceShards)); + } + + IndexReshardingState.Split.Builder builder = new IndexReshardingState.Split.Builder(reshardingMetadata.getSplit()); + for (int i = numSourceShards; i < numShardsAfterReshard; i++) { + builder.setTargetShardState(i, IndexReshardingState.Split.TargetShardState.HANDOFF); + } + var indexReshardingMetadataHandoff = new IndexReshardingMetadata(builder.build()); + var indexMetadataHandoff = IndexMetadata.builder(IndexMetadataAfterReshard) + .reshardingMetadata(indexReshardingMetadataHandoff) + .build(); + + // All target shards in HANDOFF state + for (int i = 0; i < numSourceShards; i++) { + assertTrue( + indexReshardingMetadataHandoff.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.HANDOFF) + ); + + assertThat(indexMetadataHandoff.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numShardsAfterReshard)); + assertThat(indexMetadataHandoff.getReshardSplitShardCountSummaryForSearch(i), equalTo(numSourceShards)); + } + + builder = new IndexReshardingState.Split.Builder(indexReshardingMetadataHandoff.getSplit()); + for (int i = numSourceShards; i < numShardsAfterReshard; i++) { + builder.setTargetShardState(i, IndexReshardingState.Split.TargetShardState.SPLIT); + } + var indexReshardingMetadataSplit = new IndexReshardingMetadata(builder.build()); + var indexMetadataSplit = IndexMetadata.builder(IndexMetadataAfterReshard).reshardingMetadata(indexReshardingMetadataSplit).build(); + + // All target shards in SPLIT state + for (int i = 0; i < numSourceShards; i++) { + assertTrue( + indexReshardingMetadataSplit.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.SPLIT) + ); + + assertThat(indexMetadataSplit.getReshardSplitShardCountSummaryForIndexing(i), equalTo(numShardsAfterReshard)); + assertThat(indexMetadataSplit.getReshardSplitShardCountSummaryForSearch(i), equalTo(numShardsAfterReshard)); + } + } }