- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
Reshard add shard count #133985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reshard add shard count #133985
Changes from 14 commits
02ec564
              87528bc
              865aff5
              1b0e1b4
              e3ddf06
              3ae25bf
              cab853d
              418b4ff
              5c496d0
              84a96b8
              c49ccb1
              fcfdae2
              4caafb6
              3dc489d
              eff86a7
              112a8b2
              1a32024
              cd83303
              9ae8428
              f07291d
              b569c40
              ade5904
              ed33595
              3b68cf5
              f9ca11d
              bcb3c85
              849dd5d
              8b0c5f0
              f689986
              18a2489
              d1aa4ce
              13d6538
              4ea8aeb
              88fe793
              d165abb
              15a7ae5
              6034275
              75998fa
              761ac7d
              f35138a
              fddc0bf
              ff82490
              2be982b
              59378c1
              6c8f204
              7c20e53
              9c8a8e7
              3c2d7af
              10dd203
              028eb0d
              cebb052
              eba523f
              f8b8bf5
              f392eb5
              585cf2b
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -49,6 +49,19 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ | |
| protected TimeValue timeout; | ||
| protected String index; | ||
|  | ||
| /** | ||
| * The reshardSplitShardCount has been added to accommodate the Resharding project. | ||
|         
                  ankikuma marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| * This is populated when the coordinator is deciding which shards a request applies to. | ||
| * For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits | ||
| * an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest} | ||
| * based on its' cluster state view of the number of shards that are ready for indexing. | ||
|         
                  ankikuma marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| * The purpose of this metadata is to reconcile the cluster state visible at the coordinating | ||
| * node with that visible at the source shard node. (w.r.t resharding). | ||
|         
                  bcully marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| * Note that we are able to get away with a single number, instead of an array of target shard states, | ||
| * because we only allow splits in increments of 2x. | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm tempted to want concrete examples here. I know this makes for a long comment, but I think the field is unintuitive enough to warrant it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that examples will be helpful. Its semantics really is confusing. Also, I was thinking if I rename the field in the  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we referred to this value as a "checksum of resharding state" in the design document. I wonder if calling it some kind of checksum will resolve the naming dispute. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya I like reshardSplitShardCountChecksum. Because otherwise it sounds like it's a count that you can actually use for something. It's really just a value to be used not as is, but in the context of reconciling cluster state between 2 different nodes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bikeshedding, how about "summary" instead of "checksum"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kind of like checksum better but happy change it ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I won't push hard on this. To me checksum implies an integrity check - you have some data, and alongside it you have a provided checksum, and if you recompute the checksum and it doesn't match the provided checksum you have an integrity problem. It sort of applies here - we have our own view of the routing table, and the coordinator provides a "checksum", and if we don't produce the same value over our view then we don't trust the coordinator's binning (which kind of looks like a response to an integrity error if you squint). I felt that summary (i.e., a lossily compressed form that still has relevant info) was maybe a little more accurate, but I fully admit I'm bikeshedding. Carry on as you see fit. | ||
| */ | ||
| private final int reshardSplitShardCount; | ||
|  | ||
| /** | ||
| * The number of shard copies that must be active before proceeding with the replication action. | ||
| */ | ||
|  | @@ -57,11 +70,12 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ | |
| private long routedBasedOnClusterVersion = 0; | ||
|  | ||
| public ReplicationRequest(StreamInput in) throws IOException { | ||
| this(null, in); | ||
| this(null, 0, in); | ||
| } | ||
|  | ||
| public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { | ||
| public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCount, StreamInput in) throws IOException { | ||
|          | ||
| super(in); | ||
| this.reshardSplitShardCount = reshardSplitShardCount; | ||
| final boolean thinRead = shardId != null; | ||
| if (thinRead) { | ||
| this.shardId = shardId; | ||
|  | @@ -86,9 +100,17 @@ public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOEx | |
| * Creates a new request with resolved shard id | ||
| */ | ||
| public ReplicationRequest(@Nullable ShardId shardId) { | ||
| this(shardId, 0); | ||
| } | ||
|  | ||
| /** | ||
| * Creates a new request with resolved shard id and reshardSplitShardCount | ||
| */ | ||
| public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCount) { | ||
| this.index = shardId == null ? null : shardId.getIndexName(); | ||
| this.shardId = shardId; | ||
| this.timeout = DEFAULT_TIMEOUT; | ||
| this.reshardSplitShardCount = reshardSplitShardCount; | ||
| } | ||
|  | ||
| /** | ||
|  | @@ -137,6 +159,14 @@ public ShardId shardId() { | |
| return shardId; | ||
| } | ||
|  | ||
| /** | ||
| * @return The effective shard count as seen by the coordinator when creating this request. | ||
| * can be 0 if this has not yet been resolved. | ||
| */ | ||
| public int reshardSplitShardCount() { | ||
| return reshardSplitShardCount; | ||
| } | ||
|  | ||
| /** | ||
| * Sets the number of shard copies that must be active before proceeding with the replication | ||
| * operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -28,6 +28,7 @@ | |
| import org.elasticsearch.cluster.block.ClusterBlockException; | ||
| import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.metadata.IndexReshardingState; | ||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||
| import org.elasticsearch.cluster.metadata.ProjectMetadata; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
|  | @@ -459,14 +460,22 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere | |
| try { | ||
| final ClusterState clusterState = clusterService.state(); | ||
| final Index index = primaryShardReference.routingEntry().index(); | ||
| final ProjectId projectId = clusterState.metadata().projectFor(index).id(); | ||
| final ProjectMetadata project = clusterState.metadata().projectFor(index); | ||
| final ProjectId projectId = project.id(); | ||
| final IndexMetadata indexMetadata = project.index(index); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be null? I know we have a shard reference here but I'm not sure whether that ensures that an index isn't deleted in cluster state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Index delete/close are supposed to acquire all permits before proceeding. I think TransportVerifyShardBeforeCloseAction#acquirePrimaryOperationPermit is responsible for this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, thanks | ||
|  | ||
| final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName()); | ||
| if (blockException != null) { | ||
| logger.trace("cluster is blocked, action failed on primary", blockException); | ||
| throw blockException; | ||
| } | ||
|  | ||
| int reshardSplitShardCount = primaryRequest.getRequest().reshardSplitShardCount(); | ||
| assert (reshardSplitShardCount == 0 | ||
| || reshardSplitShardCount == indexMetadata.getReshardSplitShardCount( | ||
| primaryRequest.getRequest().shardId().getId(), | ||
| IndexReshardingState.Split.TargetShardState.HANDOFF | ||
| )); | ||
|         
                  ankikuma marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| if (primaryShardReference.isRelocated()) { | ||
| primaryShardReference.close(); // release shard operation lock as soon as possible | ||
| setPhase(replicationTask, "primary_delegation"); | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -1150,6 +1150,33 @@ public int getNumberOfShards() { | |
| return numberOfShards; | ||
| } | ||
|  | ||
| /** | ||
| * The reshardSplitShardCount tells us weather requests are being routed to the source shard or | ||
|         
                  ankikuma marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| * to both source and target shards. Requests are routed to both source and target shards | ||
| * once the target shards are ready for an operation. | ||
| * @param shardId Input shardId for which we want to calculate the effective shard count | ||
| * @return Effective shard count as seen by an operation using this IndexMetadata | ||
| */ | ||
| public int getReshardSplitShardCount(int shardId, IndexReshardingState.Split.TargetShardState minShardState) { | ||
| assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds"; | ||
| int shardCount = getNumberOfShards(); | ||
| if (reshardingMetadata != null) { | ||
| if (reshardingMetadata.getSplit().isTargetShard(shardId)) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is being called for a target shard, does that mean that the coordinator has already determined that the target shard is ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The reason is that we already have target state checks for routing requests at the coordinator level. The fact that a request is routed to the target shard means that it must be ready. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added more details to the method description | ||
| // TODO: Assert that target state is atleast minShardState | ||
| int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId); | ||
| assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state"; | ||
| shardCount = reshardingMetadata.getSplit().shardCountAfter(); | ||
| } else if (reshardingMetadata.getSplit().isSourceShard(shardId)) { | ||
| if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) { | ||
| shardCount = reshardingMetadata.getSplit().shardCountAfter(); | ||
| } else { | ||
| shardCount = reshardingMetadata.getSplit().shardCountBefore(); | ||
| } | ||
| } | ||
| } | ||
| return shardCount; | ||
| } | ||
|  | ||
| public int getNumberOfReplicas() { | ||
| return numberOfReplicas; | ||
| } | ||
|  | @@ -2985,7 +3012,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException { | |
| * Returns the number of shards that should be used for routing. This basically defines the hash space we use in | ||
| * {@link IndexRouting#indexShard} to route documents | ||
| * to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only | ||
| * changes if and index is shrunk. | ||
| * changes if an index is shrunk. | ||
| */ | ||
| public int getRoutingNumShards() { | ||
| return routingNumShards; | ||
|  | @@ -3005,7 +3032,7 @@ public int getRoutingFactor() { | |
| * @param shardId the id of the target shard to split into | ||
| * @param sourceIndexMetadata the source index metadata | ||
| * @param numTargetShards the total number of shards in the target index | ||
| * @return a the source shard ID to split off from | ||
| * @return the source shard ID to split off from | ||
| */ | ||
| public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) { | ||
| int numSourceShards = sourceIndexMetadata.getNumberOfShards(); | ||
|  | ||
Uh oh!
There was an error while loading. Please reload this page.