Skip to content

Commit f38f4ad

Browse files
authored
Reshard add shard count (#133985)
Add a new field to ReplicationRequest to be used once the resharding feature is enabled. This is the reshardShardCountSummary - the effective shard count as seen by the coordination node.
1 parent 5aebcce commit f38f4ad

File tree

11 files changed

+279
-8
lines changed

11 files changed

+279
-8
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,13 +401,20 @@ private void executeBulkRequestsByShard(
401401
final ShardId shardId = entry.getKey();
402402
final List<BulkItemRequest> requests = entry.getValue();
403403

404+
// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
405+
var indexMetadata = project.index(shardId.getIndexName());
406+
int reshardSplitShardCountSummary = 0;
407+
if (indexMetadata != null) {
408+
reshardSplitShardCountSummary = indexMetadata.getReshardSplitShardCountSummaryForIndexing(shardId.getId());
409+
}
404410
BulkShardRequest bulkShardRequest = new BulkShardRequest(
405411
shardId,
412+
reshardSplitShardCountSummary,
406413
bulkRequest.getRefreshPolicy(),
407414
requests.toArray(new BulkItemRequest[0]),
408415
bulkRequest.isSimulated()
409416
);
410-
var indexMetadata = project.index(shardId.getIndexName());
417+
411418
if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) {
412419
bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields());
413420
}

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,26 @@ public BulkShardRequest(StreamInput in) throws IOException {
5050
}
5151
}
5252

53+
public BulkShardRequest(ShardId shardId, int reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
54+
this(shardId, reshardSplitShardCountSummary, refreshPolicy, items, false);
55+
}
56+
5357
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
54-
this(shardId, refreshPolicy, items, false);
58+
this(shardId, 0, refreshPolicy, items, false);
5559
}
5660

5761
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
58-
super(shardId);
62+
this(shardId, 0, refreshPolicy, items, isSimulated);
63+
}
64+
65+
public BulkShardRequest(
66+
ShardId shardId,
67+
int reshardSplitShardCountSummary,
68+
RefreshPolicy refreshPolicy,
69+
BulkItemRequest[] items,
70+
boolean isSimulated
71+
) {
72+
super(shardId, reshardSplitShardCountSummary);
5973
this.items = items;
6074
setRefreshPolicy(refreshPolicy);
6175
this.isSimulated = isSimulated;

server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) {
4747
super(shardId);
4848
}
4949

50+
public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) {
51+
super(shardId, reshardSplitShardCountSummary);
52+
}
53+
5054
@Override
5155
@SuppressWarnings("unchecked")
5256
public R setRefreshPolicy(RefreshPolicy refreshPolicy) {

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.LegacyActionRequest;
@@ -37,6 +38,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
3738
implements
3839
IndicesRequest {
3940

41+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
42+
4043
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);
4144

4245
/**
@@ -49,6 +52,45 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
4952
protected TimeValue timeout;
5053
protected String index;
5154

55+
/**
56+
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
57+
* This is populated when the coordinator is deciding which shards a request applies to.
58+
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
59+
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
60+
* based on its cluster state view of the number of shards that are ready for indexing.
61+
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
62+
* node with that visible at the source shard node. (w.r.t resharding).
63+
* When an index is being split, there is a point in time when the newly created shard (target shard)
64+
* takes over its portion of the document space from the original shard (source shard).
65+
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
66+
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
67+
* This field is used by the original shard (source shard) when it processes the request to detect whether
68+
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
69+
* or whether the request must be reprocessed taking into account the current shard states.
70+
*
71+
* Note that we are able to get away with a single number, instead of an array of target shard states,
72+
* because we only allow splits in increments of 2x.
73+
*
74+
* Example 1:
75+
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
76+
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
77+
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
78+
* Requests that are meant for shard 1 and 3 are bundled together,
79+
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
80+
*
81+
* Example 2:
82+
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
83+
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
84+
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
85+
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
86+
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
87+
* are bundled together with and sent to their source shards.
88+
*
89+
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
90+
* will be treated as a Summary mismatch on the source shard node.
91+
*/
92+
protected final int reshardSplitShardCountSummary;
93+
5294
/**
5395
* The number of shard copies that must be active before proceeding with the replication action.
5496
*/
@@ -61,6 +103,10 @@ public ReplicationRequest(StreamInput in) throws IOException {
61103
}
62104

63105
public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
106+
this(shardId, 0, in);
107+
}
108+
109+
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary, StreamInput in) throws IOException {
64110
super(in);
65111
final boolean thinRead = shardId != null;
66112
if (thinRead) {
@@ -80,15 +126,32 @@ public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOEx
80126
index = in.readString();
81127
}
82128
routedBasedOnClusterVersion = in.readVLong();
129+
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
130+
if (thinRead) {
131+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
132+
} else {
133+
this.reshardSplitShardCountSummary = in.readInt();
134+
}
135+
} else {
136+
this.reshardSplitShardCountSummary = 0;
137+
}
83138
}
84139

85140
/**
86141
* Creates a new request with resolved shard id
87142
*/
88143
public ReplicationRequest(@Nullable ShardId shardId) {
144+
this(shardId, 0);
145+
}
146+
147+
/**
148+
* Creates a new request with resolved shard id and reshardSplitShardCountSummary
149+
*/
150+
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) {
89151
this.index = shardId == null ? null : shardId.getIndexName();
90152
this.shardId = shardId;
91153
this.timeout = DEFAULT_TIMEOUT;
154+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
92155
}
93156

94157
/**
@@ -137,6 +200,14 @@ public ShardId shardId() {
137200
return shardId;
138201
}
139202

203+
/**
204+
* @return The effective shard count as seen by the coordinator when creating this request.
205+
* can be 0 if this has not yet been resolved.
206+
*/
207+
public int reshardSplitShardCountSummary() {
208+
return reshardSplitShardCountSummary;
209+
}
210+
140211
/**
141212
* Sets the number of shard copies that must be active before proceeding with the replication
142213
* operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy
@@ -191,11 +262,14 @@ public void writeTo(StreamOutput out) throws IOException {
191262
out.writeTimeValue(timeout);
192263
out.writeString(index);
193264
out.writeVLong(routedBasedOnClusterVersion);
265+
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
266+
out.writeInt(reshardSplitShardCountSummary);
267+
}
194268
}
195269

196270
/**
197271
* Thin serialization that does not write {@link #shardId} and will only write {@link #index} if it is different from the index name in
198-
* {@link #shardId}.
272+
* {@link #shardId}. Since we do not write {@link #shardId}, we also do not write {@link #reshardSplitShardCountSummary}.
199273
*/
200274
public void writeThin(StreamOutput out) throws IOException {
201275
super.writeTo(out);

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,14 +459,21 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
459459
try {
460460
final ClusterState clusterState = clusterService.state();
461461
final Index index = primaryShardReference.routingEntry().index();
462-
final ProjectId projectId = clusterState.metadata().projectFor(index).id();
462+
final ProjectMetadata project = clusterState.metadata().projectFor(index);
463+
final ProjectId projectId = project.id();
464+
final IndexMetadata indexMetadata = project.index(index);
463465

464466
final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName());
465467
if (blockException != null) {
466468
logger.trace("cluster is blocked, action failed on primary", blockException);
467469
throw blockException;
468470
}
469471

472+
int reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
473+
assert (reshardSplitShardCountSummary == 0
474+
|| reshardSplitShardCountSummary == indexMetadata.getReshardSplitShardCountSummaryForIndexing(
475+
primaryRequest.getRequest().shardId().getId()
476+
));
470477
if (primaryShardReference.isRelocated()) {
471478
primaryShardReference.close(); // release shard operation lock as soon as possible
472479
setPhase(replicationTask, "primary_delegation");

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,71 @@ public int getNumberOfShards() {
11811181
return numberOfShards;
11821182
}
11831183

1184+
/**
1185+
* This method is used in the context of the resharding feature.
1186+
* Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for
1187+
* an operation to be routed to target shards,
1188+
* this method returns the "effective" shard count as seen by this IndexMetadata.
1189+
*
1190+
* The reshardSplitShardCountSummary tells us whether the coordinator routed requests to the source shard or
1191+
* to both source and target shards. Requests are routed to both source and target shards
1192+
* once the target shards are ready for an operation.
1193+
*
1194+
* The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards
1195+
* undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to
1196+
* source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard
1197+
* to verify if the coordinator and source node's view of the resharding state have a mismatch.
1198+
* See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary}
1199+
* for a detailed description of how this value is used.
1200+
*
1201+
* @param shardId Input shardId for which we want to calculate the effective shard count
1202+
* @param minShardState Minimum target shard state required for the target to be considered ready
1203+
* @return Effective shard count as seen by an operation using this IndexMetadata
1204+
*/
1205+
private int getReshardSplitShardCountSummary(int shardId, IndexReshardingState.Split.TargetShardState minShardState) {
1206+
assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds";
1207+
int shardCount = getNumberOfShards();
1208+
if (reshardingMetadata != null) {
1209+
if (reshardingMetadata.getSplit().isTargetShard(shardId)) {
1210+
int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId);
1211+
// Requests cannot be routed to target shards until they are ready
1212+
assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state";
1213+
shardCount = reshardingMetadata.getSplit().shardCountAfter();
1214+
} else if (reshardingMetadata.getSplit().isSourceShard(shardId)) {
1215+
if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) {
1216+
shardCount = reshardingMetadata.getSplit().shardCountAfter();
1217+
} else {
1218+
shardCount = reshardingMetadata.getSplit().shardCountBefore();
1219+
}
1220+
}
1221+
}
1222+
return shardCount;
1223+
}
1224+
1225+
/**
1226+
* This method is used in the context of the resharding feature.
1227+
* Given a {@code shardId}, this method returns the "effective" shard count
1228+
* as seen by this IndexMetadata, for indexing operations.
1229+
*
1230+
* See {@code getReshardSplitShardCountSummary} for more details.
1231+
* @param shardId Input shardId for which we want to calculate the effective shard count
1232+
*/
1233+
public int getReshardSplitShardCountSummaryForIndexing(int shardId) {
1234+
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF));
1235+
}
1236+
1237+
/**
1238+
* This method is used in the context of the resharding feature.
1239+
* Given a {@code shardId}, this method returns the "effective" shard count
1240+
* as seen by this IndexMetadata, for search operations.
1241+
*
1242+
* See {@code getReshardSplitShardCount} for more details.
1243+
* @param shardId Input shardId for which we want to calculate the effective shard count
1244+
*/
1245+
public int getReshardSplitShardCountSummaryForSearch(int shardId) {
1246+
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.SPLIT));
1247+
}
1248+
11841249
public int getNumberOfReplicas() {
11851250
return numberOfReplicas;
11861251
}
@@ -3022,7 +3087,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException {
30223087
* Returns the number of shards that should be used for routing. This basically defines the hash space we use in
30233088
* {@link IndexRouting#indexShard} to route documents
30243089
* to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only
3025-
* changes if and index is shrunk.
3090+
* changes if an index is shrunk.
30263091
*/
30273092
public int getRoutingNumShards() {
30283093
return routingNumShards;
@@ -3042,7 +3107,7 @@ public int getRoutingFactor() {
30423107
* @param shardId the id of the target shard to split into
30433108
* @param sourceIndexMetadata the source index metadata
30443109
* @param numTargetShards the total number of shards in the target index
3045-
* @return a the source shard ID to split off from
3110+
* @return the source shard ID to split off from
30463111
*/
30473112
public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) {
30483113
int numSourceShards = sourceIndexMetadata.getNumberOfShards();

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,16 @@ public boolean targetStateAtLeast(int shardNum, TargetShardState targetShardStat
378378
return getTargetShardState(shardNum).ordinal() >= targetShardState.ordinal();
379379
}
380380

381+
public boolean allTargetStatesAtLeast(int sourceShardId, TargetShardState targetShardState) {
382+
var targets = getTargetStatesFor(sourceShardId);
383+
for (TargetShardState state : targets) {
384+
if (state.ordinal() < targetShardState.ordinal()) {
385+
return false;
386+
}
387+
}
388+
return true;
389+
}
390+
381391
public Stream<TargetShardState> targetStates() {
382392
return Arrays.stream(targetShards);
383393
}

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {
7272
protected final String indexName;
7373
private final int routingNumShards;
7474
private final int routingFactor;
75+
@Nullable
7576
private final IndexReshardingMetadata indexReshardingMetadata;
7677

7778
private IndexRouting(IndexMetadata metadata) {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9172000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
esql_fuse_linear_operator_status,9171000
1+
index_reshard_shardcount_summary,9172000

0 commit comments

Comments
 (0)