-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Reshard add shard count #133985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reshard add shard count #133985
Changes from all commits
02ec564
87528bc
865aff5
1b0e1b4
e3ddf06
3ae25bf
cab853d
418b4ff
5c496d0
84a96b8
c49ccb1
fcfdae2
4caafb6
3dc489d
eff86a7
112a8b2
1a32024
cd83303
9ae8428
f07291d
b569c40
ade5904
ed33595
3b68cf5
f9ca11d
bcb3c85
849dd5d
8b0c5f0
f689986
18a2489
d1aa4ce
13d6538
4ea8aeb
88fe793
d165abb
15a7ae5
6034275
75998fa
761ac7d
f35138a
fddc0bf
ff82490
2be982b
59378c1
6c8f204
7c20e53
9c8a8e7
3c2d7af
10dd203
028eb0d
cebb052
eba523f
f8b8bf5
f392eb5
585cf2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
|
||
package org.elasticsearch.action.support.replication; | ||
|
||
import org.elasticsearch.TransportVersion; | ||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.action.IndicesRequest; | ||
import org.elasticsearch.action.LegacyActionRequest; | ||
|
@@ -37,6 +38,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ | |
implements | ||
IndicesRequest { | ||
|
||
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary"); | ||
|
||
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1); | ||
|
||
/** | ||
|
@@ -49,6 +52,45 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ | |
protected TimeValue timeout; | ||
protected String index; | ||
|
||
/** | ||
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature. | ||
* This is populated when the coordinator is deciding which shards a request applies to. | ||
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits | ||
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest} | ||
* based on its cluster state view of the number of shards that are ready for indexing. | ||
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating | ||
* node with that visible at the source shard node. (w.r.t resharding). | ||
bcully marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* When an index is being split, there is a point in time when the newly created shard (target shard) | ||
* takes over its portion of the document space from the original shard (source shard). | ||
* Although the handoff is atomic at the original (source shard) and new shards (target shard), | ||
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it. | ||
* This field is used by the original shard (source shard) when it processes the request to detect whether | ||
* the coordinator's view of the new shard's state when it created the request matches the shard's current state, | ||
* or whether the request must be reprocessed taking into account the current shard states. | ||
* | ||
* Note that we are able to get away with a single number, instead of an array of target shard states, | ||
* because we only allow splits in increments of 2x. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm tempted to want concrete examples here. I know this makes for a long comment, but I think the field is unintuitive enough to warrant it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that examples will be helpful. Its semantics really is confusing. Also, I was thinking if I rename the field in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we referred to this value as a "checksum of resharding state" in the design document. I wonder if calling it some kind of checksum will resolve the naming dispute. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya I like reshardSplitShardCountChecksum. Because otherwise it sounds like it's a count that you can actually use for something. It's really just a value to be used not as is, but in the context of reconciling cluster state between 2 different nodes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bikeshedding, how about "summary" instead of "checksum"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kind of like checksum better but happy change it ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I won't push hard on this. To me checksum implies an integrity check - you have some data, and alongside it you have a provided checksum, and if you recompute the checksum and it doesn't match the provided checksum you have an integrity problem. It sort of applies here - we have our own view of the routing table, and the coordinator provides a "checksum", and if we don't produce the same value over our view then we don't trust the coordinator's binning (which kind of looks like a response to an integrity error if you squint). I felt that summary (i.e., a lossily compressed form that still has relevant info) was maybe a little more accurate, but I fully admit I'm bikeshedding. Carry on as you see fit. |
||
* | ||
* Example 1: | ||
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes | ||
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together, | ||
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request. | ||
* Requests that are meant for shard 1 and 3 are bundled together, | ||
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request. | ||
* | ||
* Example 2: | ||
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes | ||
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff. | ||
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8, | ||
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4. | ||
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards | ||
* are bundled together with and sent to their source shards. | ||
* | ||
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary | ||
* will be treated as a Summary mismatch on the source shard node. | ||
*/ | ||
protected final int reshardSplitShardCountSummary; | ||
|
||
/** | ||
* The number of shard copies that must be active before proceeding with the replication action. | ||
*/ | ||
|
@@ -61,6 +103,10 @@ public ReplicationRequest(StreamInput in) throws IOException { | |
} | ||
|
||
public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { | ||
this(shardId, 0, in); | ||
} | ||
|
||
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary, StreamInput in) throws IOException { | ||
super(in); | ||
final boolean thinRead = shardId != null; | ||
if (thinRead) { | ||
|
@@ -80,15 +126,32 @@ public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOEx | |
index = in.readString(); | ||
} | ||
routedBasedOnClusterVersion = in.readVLong(); | ||
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { | ||
if (thinRead) { | ||
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; | ||
} else { | ||
this.reshardSplitShardCountSummary = in.readInt(); | ||
} | ||
} else { | ||
this.reshardSplitShardCountSummary = 0; | ||
} | ||
} | ||
|
||
/** | ||
* Creates a new request with resolved shard id | ||
*/ | ||
public ReplicationRequest(@Nullable ShardId shardId) { | ||
this(shardId, 0); | ||
} | ||
|
||
/** | ||
* Creates a new request with resolved shard id and reshardSplitShardCountSummary | ||
*/ | ||
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) { | ||
this.index = shardId == null ? null : shardId.getIndexName(); | ||
this.shardId = shardId; | ||
this.timeout = DEFAULT_TIMEOUT; | ||
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; | ||
} | ||
|
||
/** | ||
|
@@ -137,6 +200,14 @@ public ShardId shardId() { | |
return shardId; | ||
} | ||
|
||
/** | ||
* @return The effective shard count as seen by the coordinator when creating this request. | ||
* can be 0 if this has not yet been resolved. | ||
*/ | ||
public int reshardSplitShardCountSummary() { | ||
return reshardSplitShardCountSummary; | ||
} | ||
|
||
/** | ||
* Sets the number of shard copies that must be active before proceeding with the replication | ||
* operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy | ||
|
@@ -191,11 +262,14 @@ public void writeTo(StreamOutput out) throws IOException { | |
out.writeTimeValue(timeout); | ||
out.writeString(index); | ||
out.writeVLong(routedBasedOnClusterVersion); | ||
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { | ||
out.writeInt(reshardSplitShardCountSummary); | ||
} | ||
} | ||
|
||
/** | ||
* Thin serialization that does not write {@link #shardId} and will only write {@link #index} if it is different from the index name in | ||
* {@link #shardId}. | ||
* {@link #shardId}. Since we do not write {@link #shardId}, we also do not write {@link #reshardSplitShardCountSummary}. | ||
*/ | ||
public void writeThin(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -459,14 +459,21 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere | |
try { | ||
final ClusterState clusterState = clusterService.state(); | ||
final Index index = primaryShardReference.routingEntry().index(); | ||
final ProjectId projectId = clusterState.metadata().projectFor(index).id(); | ||
final ProjectMetadata project = clusterState.metadata().projectFor(index); | ||
final ProjectId projectId = project.id(); | ||
final IndexMetadata indexMetadata = project.index(index); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be null? I know we have a shard reference here but I'm not sure whether that ensures that an index isn't deleted in cluster state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Index delete/close are supposed to acquire all permits before proceeding. I think TransportVerifyShardBeforeCloseAction#acquirePrimaryOperationPermit is responsible for this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, thanks |
||
|
||
final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName()); | ||
if (blockException != null) { | ||
logger.trace("cluster is blocked, action failed on primary", blockException); | ||
throw blockException; | ||
} | ||
|
||
int reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary(); | ||
assert (reshardSplitShardCountSummary == 0 | ||
|| reshardSplitShardCountSummary == indexMetadata.getReshardSplitShardCountSummaryForIndexing( | ||
primaryRequest.getRequest().shardId().getId() | ||
)); | ||
ankikuma marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (primaryShardReference.isRelocated()) { | ||
primaryShardReference.close(); // release shard operation lock as soon as possible | ||
setPhase(replicationTask, "primary_delegation"); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1181,6 +1181,71 @@ public int getNumberOfShards() { | |
return numberOfShards; | ||
} | ||
|
||
/** | ||
* This method is used in the context of the resharding feature. | ||
* Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for | ||
* an operation to be routed to target shards, | ||
* this method returns the "effective" shard count as seen by this IndexMetadata. | ||
* | ||
* The reshardSplitShardCountSummary tells us whether the coordinator routed requests to the source shard or | ||
* to both source and target shards. Requests are routed to both source and target shards | ||
* once the target shards are ready for an operation. | ||
* | ||
* The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards | ||
* undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to | ||
* source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard | ||
* to verify if the coordinator and source node's view of the resharding state have a mismatch. | ||
* See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary} | ||
* for a detailed description of how this value is used. | ||
* | ||
* @param shardId Input shardId for which we want to calculate the effective shard count | ||
* @param minShardState Minimum target shard state required for the target to be considered ready | ||
* @return Effective shard count as seen by an operation using this IndexMetadata | ||
*/ | ||
private int getReshardSplitShardCountSummary(int shardId, IndexReshardingState.Split.TargetShardState minShardState) { | ||
assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds"; | ||
int shardCount = getNumberOfShards(); | ||
if (reshardingMetadata != null) { | ||
if (reshardingMetadata.getSplit().isTargetShard(shardId)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is being called for a target shard, does that mean that the coordinator has already determined that the target shard is ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The reason is that we already have target state checks for routing requests at the coordinator level. The fact that a request is routed to the target shard means that it must be ready. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added more details to the method description |
||
int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId); | ||
// Requests cannot be routed to target shards until they are ready | ||
assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state"; | ||
shardCount = reshardingMetadata.getSplit().shardCountAfter(); | ||
} else if (reshardingMetadata.getSplit().isSourceShard(shardId)) { | ||
if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) { | ||
shardCount = reshardingMetadata.getSplit().shardCountAfter(); | ||
} else { | ||
shardCount = reshardingMetadata.getSplit().shardCountBefore(); | ||
} | ||
} | ||
} | ||
return shardCount; | ||
} | ||
|
||
/** | ||
* This method is used in the context of the resharding feature. | ||
* Given a {@code shardId}, this method returns the "effective" shard count | ||
* as seen by this IndexMetadata, for indexing operations. | ||
* | ||
* See {@code getReshardSplitShardCountSummary} for more details. | ||
* @param shardId Input shardId for which we want to calculate the effective shard count | ||
*/ | ||
public int getReshardSplitShardCountSummaryForIndexing(int shardId) { | ||
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF)); | ||
} | ||
|
||
/** | ||
* This method is used in the context of the resharding feature. | ||
* Given a {@code shardId}, this method returns the "effective" shard count | ||
* as seen by this IndexMetadata, for search operations. | ||
* | ||
* See {@code getReshardSplitShardCount} for more details. | ||
* @param shardId Input shardId for which we want to calculate the effective shard count | ||
*/ | ||
public int getReshardSplitShardCountSummaryForSearch(int shardId) { | ||
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.SPLIT)); | ||
} | ||
|
||
public int getNumberOfReplicas() { | ||
return numberOfReplicas; | ||
} | ||
|
@@ -3022,7 +3087,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException { | |
* Returns the number of shards that should be used for routing. This basically defines the hash space we use in | ||
* {@link IndexRouting#indexShard} to route documents | ||
* to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only | ||
* changes if and index is shrunk. | ||
* changes if an index is shrunk. | ||
*/ | ||
public int getRoutingNumShards() { | ||
return routingNumShards; | ||
|
@@ -3042,7 +3107,7 @@ public int getRoutingFactor() { | |
* @param shardId the id of the target shard to split into | ||
* @param sourceIndexMetadata the source index metadata | ||
* @param numTargetShards the total number of shards in the target index | ||
* @return a the source shard ID to split off from | ||
* @return the source shard ID to split off from | ||
*/ | ||
public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) { | ||
int numSourceShards = sourceIndexMetadata.getNumberOfShards(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
9172000 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
esql_fuse_linear_operator_status,9171000 | ||
index_reshard_shardcount_summary,9172000 |
Uh oh!
There was an error while loading. Please reload this page.