Skip to content
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
02ec564
Add shard count
ankikuma Aug 28, 2025
87528bc
Add reshard count
ankikuma Sep 2, 2025
865aff5
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 2, 2025
1b0e1b4
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 2, 2025
e3ddf06
Read reshardShardCount
ankikuma Sep 2, 2025
3ae25bf
Add test
ankikuma Sep 2, 2025
cab853d
[CI] Auto commit changes from spotless
Sep 2, 2025
418b4ff
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 3, 2025
5c496d0
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 3, 2025
84a96b8
change variable names
ankikuma Sep 3, 2025
c49ccb1
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 3, 2025
fcfdae2
[CI] Auto commit changes from spotless
Sep 3, 2025
4caafb6
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 3, 2025
3dc489d
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 3, 2025
eff86a7
serialize reshardAddShardCount on the wire
ankikuma Sep 12, 2025
112a8b2
refresh branch
ankikuma Sep 12, 2025
1a32024
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 12, 2025
cd83303
commit
ankikuma Sep 12, 2025
9ae8428
commit
ankikuma Sep 12, 2025
f07291d
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 12, 2025
b569c40
commit
ankikuma Sep 12, 2025
ade5904
Merge remote-tracking branch 'transport-version-resources-upstream/ma…
ankikuma Sep 12, 2025
ed33595
fix writeTo
ankikuma Sep 12, 2025
3b68cf5
[CI] Update transport version definitions
Sep 12, 2025
f9ca11d
serialization changewq
ankikuma Sep 15, 2025
bcb3c85
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 15, 2025
849dd5d
[CI] Update transport version definitions
Sep 15, 2025
8b0c5f0
serialize ReplicationRequest
ankikuma Sep 15, 2025
f689986
refresh
ankikuma Sep 15, 2025
18a2489
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 15, 2025
d1aa4ce
calls to super
ankikuma Sep 15, 2025
13d6538
calls to super
ankikuma Sep 15, 2025
4ea8aeb
fix bug
ankikuma Sep 15, 2025
88fe793
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 15, 2025
d165abb
Modify test
ankikuma Sep 16, 2025
15a7ae5
Replication Request change
ankikuma Sep 16, 2025
6034275
commit
ankikuma Sep 16, 2025
75998fa
[CI] Auto commit changes from spotless
Sep 16, 2025
761ac7d
address review comments
ankikuma Sep 17, 2025
f35138a
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 17, 2025
fddc0bf
transportversions
ankikuma Sep 18, 2025
ff82490
minor review changes
ankikuma Sep 19, 2025
2be982b
es
ankikuma Sep 19, 2025
59378c1
refresh
ankikuma Sep 19, 2025
6c8f204
commit
ankikuma Sep 22, 2025
7c20e53
tv changes
ankikuma Sep 22, 2025
9c8a8e7
commit
ankikuma Sep 22, 2025
3c2d7af
commit
ankikuma Sep 22, 2025
10dd203
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 22, 2025
028eb0d
commit
ankikuma Sep 22, 2025
cebb052
Rename reshardSplitShardCountChecksum to reshardSplitShardCountSummary
ankikuma Sep 25, 2025
eba523f
commit
ankikuma Sep 25, 2025
f8b8bf5
refresh
ankikuma Sep 25, 2025
f392eb5
refresh
ankikuma Sep 25, 2025
585cf2b
csv
ankikuma Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,20 @@ private void executeBulkRequestsByShard(
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.index(shardId.getIndexName());
int reshardSplitShardCountChecksum = 0;
if (indexMetadata != null) {
reshardSplitShardCountChecksum = indexMetadata.getReshardSplitShardCountChecksumForIndexing(shardId.getId());
}
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
reshardSplitShardCountChecksum,
bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[0]),
bulkRequest.isSimulated()
);
var indexMetadata = project.index(shardId.getIndexName());

if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) {
bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,26 @@ public BulkShardRequest(StreamInput in) throws IOException {
}
}

public BulkShardRequest(ShardId shardId, int reshardSplitShardCountChecksum, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, reshardSplitShardCountChecksum, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, refreshPolicy, items, false);
this(shardId, 0, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
super(shardId);
this(shardId, 0, refreshPolicy, items, isSimulated);
}

public BulkShardRequest(
ShardId shardId,
int reshardSplitShardCountChecksum,
RefreshPolicy refreshPolicy,
BulkItemRequest[] items,
boolean isSimulated
) {
super(shardId, reshardSplitShardCountChecksum);
this.items = items;
setRefreshPolicy(refreshPolicy);
this.isSimulated = isSimulated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) {
super(shardId);
}

public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCountChecksum) {
super(shardId, reshardSplitShardCountChecksum);
}

@Override
@SuppressWarnings("unchecked")
public R setRefreshPolicy(RefreshPolicy refreshPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
Expand Down Expand Up @@ -37,6 +38,10 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
implements
IndicesRequest {

private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_CHECKSUM = TransportVersion.fromName(
"index_reshard_shardcount_checksum"
);

public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);

/**
Expand All @@ -49,6 +54,45 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
protected TimeValue timeout;
protected String index;

/**
* The reshardSplitShardCountChecksum has been added to accommodate the Resharding feature.
* This is populated when the coordinator is deciding which shards a request applies to.
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
* based on its cluster state view of the number of shards that are ready for indexing.
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
* node with that visible at the source shard node. (w.r.t resharding).
* When an index is being split, there is a point in time when the newly created shard (target shard)
* takes over its portion of the document space from the original shard (source shard).
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
* This field is used by the original shard (source shard) when it processes the request to detect whether
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
* or whether the request must be reprocessed taking into account the current shard states.
*
* Note that we are able to get away with a single number, instead of an array of target shard states,
* because we only allow splits in increments of 2x.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to want concrete examples here. I know this makes for a long comment, but I think the field is unintuitive enough to warrant it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that examples will be helpful. Its semantics really is confusing.

Also, I was thinking if I rename the field in the ReplicationRequest to reshardSplitExpectedShardCount, will that be better ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we referred to this value as a "checksum of resharding state" in the design document. I wonder if calling it some kind of checksum will resolve the naming dispute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I like reshardSplitShardCountChecksum. Because otherwise it sounds like it's a count that you can actually use for something. It's really just a value to be used not as is, but in the context of reconciling cluster state between 2 different nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshedding, how about "summary" instead of "checksum"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of like checksum better but happy change it ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I won't push hard on this. To me checksum implies an integrity check - you have some data, and alongside it you have a provided checksum, and if you recompute the checksum and it doesn't match the provided checksum you have an integrity problem. It sort of applies here - we have our own view of the routing table, and the coordinator provides a "checksum", and if we don't produce the same value over our view then we don't trust the coordinator's binning (which kind of looks like a response to an integrity error if you squint).

I felt that summary (i.e., a lossily compressed form that still has relevant info) was maybe a little more accurate, but I fully admit I'm bikeshedding. Carry on as you see fit.

*
* Example 1:
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
* sent to shard 0 with “reshardSplitShardCountChecksum” 2 in the request.
* Requests that are meant for shard 1 and 3 are bundled together,
* sent to shard 1 with “reshardSplitShardCountChecksum” 2 in the request.
*
* Example 2:
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountChecksum" 8,
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountChecksum" 4.
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
* are bundled together with and sent to their source shards.
*
* A value of 0 indicates an INVALID reshardSplitShardCountChecksum. Hence, a request with INVALID reshardSplitShardCountChecksum
* will be treated as a checksum mismatch on the source shard node.
*/
protected final int reshardSplitShardCountChecksum;

/**
* The number of shard copies that must be active before proceeding with the replication action.
*/
Expand All @@ -61,6 +105,10 @@ public ReplicationRequest(StreamInput in) throws IOException {
}

public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
this(shardId, 0, in);
}

public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountChecksum, StreamInput in) throws IOException {
super(in);
final boolean thinRead = shardId != null;
if (thinRead) {
Expand All @@ -80,15 +128,32 @@ public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOEx
index = in.readString();
}
routedBasedOnClusterVersion = in.readVLong();
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_CHECKSUM)) {
if (thinRead) {
this.reshardSplitShardCountChecksum = reshardSplitShardCountChecksum;
} else {
this.reshardSplitShardCountChecksum = in.readInt();
}
} else {
this.reshardSplitShardCountChecksum = 0;
}
}

/**
* Creates a new request with resolved shard id
*/
public ReplicationRequest(@Nullable ShardId shardId) {
this(shardId, 0);
}

/**
* Creates a new request with resolved shard id and reshardSplitShardCountChecksum
*/
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountChecksum) {
this.index = shardId == null ? null : shardId.getIndexName();
this.shardId = shardId;
this.timeout = DEFAULT_TIMEOUT;
this.reshardSplitShardCountChecksum = reshardSplitShardCountChecksum;
}

/**
Expand Down Expand Up @@ -137,6 +202,14 @@ public ShardId shardId() {
return shardId;
}

/**
* @return The effective shard count as seen by the coordinator when creating this request.
* can be 0 if this has not yet been resolved.
*/
public int reshardSplitShardCountChecksum() {
return reshardSplitShardCountChecksum;
}

/**
* Sets the number of shard copies that must be active before proceeding with the replication
* operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy
Expand Down Expand Up @@ -191,11 +264,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(timeout);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_CHECKSUM)) {
out.writeInt(reshardSplitShardCountChecksum);
}
}

/**
* Thin serialization that does not write {@link #shardId} and will only write {@link #index} if it is different from the index name in
* {@link #shardId}.
* {@link #shardId}. Since we do not write {@link #shardId}, we also do not write {@link #reshardSplitShardCountChecksum}.
*/
public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,21 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
try {
final ClusterState clusterState = clusterService.state();
final Index index = primaryShardReference.routingEntry().index();
final ProjectId projectId = clusterState.metadata().projectFor(index).id();
final ProjectMetadata project = clusterState.metadata().projectFor(index);
final ProjectId projectId = project.id();
final IndexMetadata indexMetadata = project.index(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be null? I know we have a shard reference here but I'm not sure whether that ensures that an index isn't deleted in cluster state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Index delete/close are supposed to acquire all permits before proceeding. I think TransportVerifyShardBeforeCloseAction#acquirePrimaryOperationPermit is responsible for this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks


final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName());
if (blockException != null) {
logger.trace("cluster is blocked, action failed on primary", blockException);
throw blockException;
}

int reshardSplitShardCountChecksum = primaryRequest.getRequest().reshardSplitShardCountChecksum();
assert (reshardSplitShardCountChecksum == 0
|| reshardSplitShardCountChecksum == indexMetadata.getReshardSplitShardCountChecksumForIndexing(
primaryRequest.getRequest().shardId().getId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being executed on the source shard? If so, could this assertion fire when the coordinator's checksum is stale vs the source?

));
if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,71 @@ public int getNumberOfShards() {
return numberOfShards;
}

/**
* This method is used in the context of the resharding feature.
* Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for
* an operation to be routed to target shards,
* this method returns the "effective" shard count as seen by this IndexMetadata.
*
* The reshardSplitShardCountChecksum tells us whether the coordinator routed requests to the source shard or
* to both source and target shards. Requests are routed to both source and target shards
* once the target shards are ready for an operation.
*
* The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards
* undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to
* source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard
* to verify if the coordinator and source node's view of the resharding state have a mismatch.
* See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountChecksum}
* for a detailed description of how this value is used.
*
* @param shardId Input shardId for which we want to calculate the effective shard count
* @param minShardState Minimum target shard state required for the target to be considered ready
* @return Effective shard count as seen by an operation using this IndexMetadata
*/
private int getReshardSplitShardCountChecksum(int shardId, IndexReshardingState.Split.TargetShardState minShardState) {
assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds";
int shardCount = getNumberOfShards();
if (reshardingMetadata != null) {
if (reshardingMetadata.getSplit().isTargetShard(shardId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is being called for a target shard, does that mean that the coordinator has already determined that the target shard is ready?

Copy link
Contributor Author

@ankikuma ankikuma Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason is that we already have target state checks for routing requests at the coordinator level. The fact that a request is routed to the target shard means that it must be ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added more details to the method description

int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId);
// Requests cannot be routed to target shards until they are ready
assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state";
shardCount = reshardingMetadata.getSplit().shardCountAfter();
} else if (reshardingMetadata.getSplit().isSourceShard(shardId)) {
if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) {
shardCount = reshardingMetadata.getSplit().shardCountAfter();
} else {
shardCount = reshardingMetadata.getSplit().shardCountBefore();
}
}
}
return shardCount;
}

/**
* This method is used in the context of the resharding feature.
* Given a {@code shardId}, this method returns the "effective" shard count
* as seen by this IndexMetadata, for indexing operations.
*
* See {@code getReshardSplitShardCountChecksum} for more details.
* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public int getReshardSplitShardCountChecksumForIndexing(int shardId) {
return (getReshardSplitShardCountChecksum(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit but seems like there's extra parentheses? Also for getReshardSplitShardCountChecksumForSearch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will fix it.

}

/**
* This method is used in the context of the resharding feature.
* Given a {@code shardId}, this method returns the "effective" shard count
* as seen by this IndexMetadata, for search operations.
*
* See {@code getReshardSplitShardCount} for more details.
* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public int getReshardSplitShardCountChecksumForSearch(int shardId) {
return (getReshardSplitShardCountChecksum(shardId, IndexReshardingState.Split.TargetShardState.SPLIT));
}

public int getNumberOfReplicas() {
return numberOfReplicas;
}
Expand Down Expand Up @@ -3022,7 +3087,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException {
* Returns the number of shards that should be used for routing. This basically defines the hash space we use in
* {@link IndexRouting#indexShard} to route documents
* to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only
* changes if and index is shrunk.
* changes if an index is shrunk.
*/
public int getRoutingNumShards() {
return routingNumShards;
Expand All @@ -3042,7 +3107,7 @@ public int getRoutingFactor() {
* @param shardId the id of the target shard to split into
* @param sourceIndexMetadata the source index metadata
* @param numTargetShards the total number of shards in the target index
* @return a the source shard ID to split off from
* @return the source shard ID to split off from
*/
public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) {
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ public boolean targetStateAtLeast(int shardNum, TargetShardState targetShardStat
return getTargetShardState(shardNum).ordinal() >= targetShardState.ordinal();
}

public boolean allTargetStatesAtLeast(int sourceShardId, TargetShardState targetShardState) {
var targets = getTargetStatesFor(sourceShardId);
for (TargetShardState state : targets) {
if (state.ordinal() < targetShardState.ordinal()) {
return false;
}
}
return true;
}

public Stream<TargetShardState> targetStates() {
return Arrays.stream(targetShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {
protected final String indexName;
private final int routingNumShards;
private final int routingFactor;
@Nullable
private final IndexReshardingMetadata indexReshardingMetadata;

private IndexRouting(IndexMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9168000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
index_request_include_tsid,9167000
index_reshard_shardcount_checksum,9168000
Loading