Skip to content

Commit e3ddf06

Browse files
committed
Read reshardShardCount
1 parent 1b0e1b4 commit e3ddf06

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,14 @@ public ShardId shardId() {
159159
return shardId;
160160
}
161161

162+
/**
163+
* @return The effective shard count as seen by the coordinator when creating this request.
164+
* can be 0 if this has not yet been resolved.
165+
*/
166+
public int reshardSplitShardCount() {
167+
return reshardSplitShardCount;
168+
}
169+
162170
/**
163171
* Sets the number of shard copies that must be active before proceeding with the replication
164172
* operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.block.ClusterBlockException;
2929
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3030
import org.elasticsearch.cluster.metadata.IndexMetadata;
31+
import org.elasticsearch.cluster.metadata.IndexReshardingState;
3132
import org.elasticsearch.cluster.metadata.ProjectId;
3233
import org.elasticsearch.cluster.metadata.ProjectMetadata;
3334
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -459,14 +460,22 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
459460
try {
460461
final ClusterState clusterState = clusterService.state();
461462
final Index index = primaryShardReference.routingEntry().index();
462-
final ProjectId projectId = clusterState.metadata().projectFor(index).id();
463+
final ProjectMetadata project = clusterState.metadata().projectFor(index);
464+
final ProjectId projectId = project.id();
465+
final IndexMetadata indexMetadata = project.index(index);
463466

464467
final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName());
465468
if (blockException != null) {
466469
logger.trace("cluster is blocked, action failed on primary", blockException);
467470
throw blockException;
468471
}
469472

473+
int reshardSplitShardCount = primaryRequest.getRequest().reshardSplitShardCount();
474+
assert (reshardSplitShardCount == 0
475+
|| reshardSplitShardCount == indexMetadata.getReshardSplitShardCount(
476+
primaryRequest.getRequest().shardId().getId(),
477+
IndexReshardingState.Split.TargetShardState.HANDOFF
478+
));
470479
if (primaryShardReference.isRelocated()) {
471480
primaryShardReference.close(); // release shard operation lock as soon as possible
472481
setPhase(replicationTask, "primary_delegation");

0 commit comments

Comments
 (0)