Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
02ec564
Add shard count
ankikuma Aug 28, 2025
87528bc
Add reshard count
ankikuma Sep 2, 2025
865aff5
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 2, 2025
1b0e1b4
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 2, 2025
e3ddf06
Read reshardShardCount
ankikuma Sep 2, 2025
3ae25bf
Add test
ankikuma Sep 2, 2025
cab853d
[CI] Auto commit changes from spotless
Sep 2, 2025
418b4ff
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 3, 2025
5c496d0
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 3, 2025
84a96b8
change variable names
ankikuma Sep 3, 2025
c49ccb1
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 3, 2025
fcfdae2
[CI] Auto commit changes from spotless
Sep 3, 2025
4caafb6
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 3, 2025
3dc489d
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 3, 2025
eff86a7
serialize reshardAddShardCount on the wire
ankikuma Sep 12, 2025
112a8b2
refresh branch
ankikuma Sep 12, 2025
1a32024
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 12, 2025
cd83303
commit
ankikuma Sep 12, 2025
9ae8428
commit
ankikuma Sep 12, 2025
f07291d
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 12, 2025
b569c40
commit
ankikuma Sep 12, 2025
ade5904
Merge remote-tracking branch 'transport-version-resources-upstream/ma…
ankikuma Sep 12, 2025
ed33595
fix writeTo
ankikuma Sep 12, 2025
3b68cf5
[CI] Update transport version definitions
Sep 12, 2025
f9ca11d
serialization changewq
ankikuma Sep 15, 2025
bcb3c85
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 15, 2025
849dd5d
[CI] Update transport version definitions
Sep 15, 2025
8b0c5f0
serialize ReplicationRequest
ankikuma Sep 15, 2025
f689986
refresh
ankikuma Sep 15, 2025
18a2489
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 15, 2025
d1aa4ce
calls to super
ankikuma Sep 15, 2025
13d6538
calls to super
ankikuma Sep 15, 2025
4ea8aeb
fix bug
ankikuma Sep 15, 2025
88fe793
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 15, 2025
d165abb
Modify test
ankikuma Sep 16, 2025
15a7ae5
Replication Request change
ankikuma Sep 16, 2025
6034275
commit
ankikuma Sep 16, 2025
75998fa
[CI] Auto commit changes from spotless
Sep 16, 2025
761ac7d
address review comments
ankikuma Sep 17, 2025
f35138a
Merge branch '08272025/ReshardAddShardCount' of github.com:ankikuma/e…
ankikuma Sep 17, 2025
fddc0bf
transportversions
ankikuma Sep 18, 2025
ff82490
minor review changes
ankikuma Sep 19, 2025
2be982b
es
ankikuma Sep 19, 2025
59378c1
refresh
ankikuma Sep 19, 2025
6c8f204
commit
ankikuma Sep 22, 2025
7c20e53
tv changes
ankikuma Sep 22, 2025
9c8a8e7
commit
ankikuma Sep 22, 2025
3c2d7af
commit
ankikuma Sep 22, 2025
10dd203
Merge remote-tracking branch 'upstream/main' into 08272025/ReshardAdd…
ankikuma Sep 22, 2025
028eb0d
commit
ankikuma Sep 22, 2025
cebb052
Rename reshardSplitShardCountChecksum to reshardSplitShardCountSummary
ankikuma Sep 25, 2025
eba523f
commit
ankikuma Sep 25, 2025
f8b8bf5
refresh
ankikuma Sep 25, 2025
f392eb5
refresh
ankikuma Sep 25, 2025
585cf2b
csv
ankikuma Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_SOURCE = def(9_158_0_00);
public static final TransportVersion MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO = def(9_159_0_00);
public static final TransportVersion TIMESERIES_DEFAULT_LIMIT = def(9_160_0_00);
public static final TransportVersion INDEX_RESHARD_SHARDCOUNT_REPLICATION_REQUEST = def(9_161_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
Expand Down Expand Up @@ -401,13 +402,20 @@ private void executeBulkRequestsByShard(
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.index(shardId.getIndexName());
int reshardSplitShardCount = indexMetadata.getReshardSplitShardCount(
shardId.getId(),
IndexReshardingState.Split.TargetShardState.HANDOFF
);
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
reshardSplitShardCount,
bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[0]),
bulkRequest.isSimulated()
);
var indexMetadata = project.index(shardId.getIndexName());

if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) {
bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,26 @@ public BulkShardRequest(StreamInput in) throws IOException {
}
}

public BulkShardRequest(ShardId shardId, int reshardSplitShardCount, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, reshardSplitShardCount, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, refreshPolicy, items, false);
this(shardId, 0, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
super(shardId);
this(shardId, 0, refreshPolicy, items, isSimulated);
}

public BulkShardRequest(
ShardId shardId,
int reshardSplitShardCount,
RefreshPolicy refreshPolicy,
BulkItemRequest[] items,
boolean isSimulated
) {
super(shardId, reshardSplitShardCount);
this.items = items;
setRefreshPolicy(refreshPolicy);
this.isSimulated = isSimulated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) {
super(shardId);
}

public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCount) {
super(shardId, reshardSplitShardCount);
}

@Override
@SuppressWarnings("unchecked")
public R setRefreshPolicy(RefreshPolicy refreshPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
Expand Down Expand Up @@ -49,6 +50,19 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
protected TimeValue timeout;
protected String index;

/**
* The reshardSplitShardCount has been added to accommodate the Resharding project.
* This is populated when the coordinator is deciding which shards a request applies to.
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
* based on its' cluster state view of the number of shards that are ready for indexing.
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
* node with that visible at the source shard node. (w.r.t resharding).
* Note that we are able to get away with a single number, instead of an array of target shard states,
* because we only allow splits in increments of 2x.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to want concrete examples here. I know this makes for a long comment, but I think the field is unintuitive enough to warrant it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that examples will be helpful. Its semantics really is confusing.

Also, I was thinking if I rename the field in the ReplicationRequest to reshardSplitExpectedShardCount, will that be better ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we referred to this value as a "checksum of resharding state" in the design document. I wonder if calling it some kind of checksum will resolve the naming dispute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I like reshardSplitShardCountChecksum. Because otherwise it sounds like it's a count that you can actually use for something. It's really just a value to be used not as is, but in the context of reconciling cluster state between 2 different nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshedding, how about "summary" instead of "checksum"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of like checksum better but happy change it ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I won't push hard on this. To me checksum implies an integrity check - you have some data, and alongside it you have a provided checksum, and if you recompute the checksum and it doesn't match the provided checksum you have an integrity problem. It sort of applies here - we have our own view of the routing table, and the coordinator provides a "checksum", and if we don't produce the same value over our view then we don't trust the coordinator's binning (which kind of looks like a response to an integrity error if you squint).

I felt that summary (i.e., a lossily compressed form that still has relevant info) was maybe a little more accurate, but I fully admit I'm bikeshedding. Carry on as you see fit.

*/
protected final int reshardSplitShardCount;

/**
* The number of shard copies that must be active before proceeding with the replication action.
*/
Expand All @@ -61,6 +75,10 @@ public ReplicationRequest(StreamInput in) throws IOException {
}

public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
this(shardId, 0, in);
}

public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCount, StreamInput in) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constructor looks a little odd - why does it take a passed in reshardSplitShardCount separately from in, and why does in override the provided one? What uses this constructor and doesn't supply 0 as the passed in reshardSplitShardCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really used with a non-zero reshardSplitShardCount right now. But I think we need this because of the writeThin serialization, which allows us to serialize a request without a shardId (with just the index name). If we serialized a request without including a shardId, it also makes sense to leave out reshardSplitShardCount.
So then this constructor can be called with a resolved shardId, and we might want to provide a reshardSplitShardCount with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because some inheritors do not have shardId (i guess IndexRequest is an example)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've skimmed #56209 that introduced thin serialization, apparently to save redundant shard ID serialization since shard IDs are large. It looks like the expected case is that shard ID is not null so we can do the thin read, but that the request may override the shard ID even if it is not null (lines 113-121 below, on the right side of the diff).

I suppose it's the case that if we can omit the shard ID in serialization because we're getting it from somewhere else then we can probably also save 4 bytes per request on reshardSplitShardCount, so we can provide it to the constructor instead of reading it over the wire. But as I read the code below, that's not what we're doing. We're just using a provided value if the transport version isn't recent enough. I don't think that's a thin serialization concern. I think if we want to be thin, then we use the provided value and don't serialize it at all? Unless there's a case where we need to override the provided value, as with index. Then we need to do a boolean to signal the presence of the shard ID and I think the savings isn't worth it.

Overall, my feeling is that given this is only a 4 byte field, it would be simpler right now to not provide the shard count to the constructor and always deserialize it if the transport version is new enough, or supply 0 in line if it isn't. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging into this Brendan. What I realized recently is that each BulkItemRequest contains a DocWriteRequest which in turn is a ReplicationRequest. So each BulkItemRequest also contains the reshardSplitShardCount.

But you are right that I should probably use an "INVALID" value if the transport version isn't recent enough. Like -1 maybe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think INVALID and 0 are equivalent. In either case, we have to inspect each request. I fixed the serialization. I think we should keep the thinWrite semantics that if we don't serialize the shardId, we don't serialize the shardCount either. This is to avoid serializing it in each BulkItemRequest.
When we get the shardId from the BulkShardRequest, we can get the shardCount as well.

super(in);
final boolean thinRead = shardId != null;
if (thinRead) {
Expand All @@ -80,15 +98,28 @@ public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOEx
index = in.readString();
}
routedBasedOnClusterVersion = in.readVLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARD_SHARDCOUNT_REPLICATION_REQUEST) && (thinRead == false)) {
this.reshardSplitShardCount = in.readInt();
} else {
this.reshardSplitShardCount = reshardSplitShardCount;
}
}

/**
* Creates a new request with resolved shard id
*/
public ReplicationRequest(@Nullable ShardId shardId) {
this(shardId, 0);
}

/**
* Creates a new request with resolved shard id and reshardSplitShardCount
*/
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCount) {
this.index = shardId == null ? null : shardId.getIndexName();
this.shardId = shardId;
this.timeout = DEFAULT_TIMEOUT;
this.reshardSplitShardCount = reshardSplitShardCount;
}

/**
Expand Down Expand Up @@ -137,6 +168,14 @@ public ShardId shardId() {
return shardId;
}

/**
* @return The effective shard count as seen by the coordinator when creating this request.
* can be 0 if this has not yet been resolved.
*/
public int reshardSplitShardCount() {
return reshardSplitShardCount;
}

/**
* Sets the number of shard copies that must be active before proceeding with the replication
* operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy
Expand Down Expand Up @@ -191,11 +230,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(timeout);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARD_SHARDCOUNT_REPLICATION_REQUEST)) {
out.writeInt(reshardSplitShardCount);
}
}

/**
* Thin serialization that does not write {@link #shardId} and will only write {@link #index} if it is different from the index name in
* {@link #shardId}.
* {@link #shardId}. Since we do not write {@link #shardId}, we also do not write {@link #reshardSplitShardCount}.
*/
public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -459,14 +460,22 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
try {
final ClusterState clusterState = clusterService.state();
final Index index = primaryShardReference.routingEntry().index();
final ProjectId projectId = clusterState.metadata().projectFor(index).id();
final ProjectMetadata project = clusterState.metadata().projectFor(index);
final ProjectId projectId = project.id();
final IndexMetadata indexMetadata = project.index(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be null? I know we have a shard reference here but I'm not sure whether that ensures that an index isn't deleted in cluster state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Index delete/close are supposed to acquire all permits before proceeding. I think TransportVerifyShardBeforeCloseAction#acquirePrimaryOperationPermit is responsible for this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks


final ClusterBlockException blockException = blockExceptions(clusterState, projectId, index.getName());
if (blockException != null) {
logger.trace("cluster is blocked, action failed on primary", blockException);
throw blockException;
}

int reshardSplitShardCount = primaryRequest.getRequest().reshardSplitShardCount();
assert (reshardSplitShardCount == 0
|| reshardSplitShardCount == indexMetadata.getReshardSplitShardCount(
primaryRequest.getRequest().shardId().getId(),
IndexReshardingState.Split.TargetShardState.HANDOFF
));
if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,34 @@ public int getNumberOfShards() {
return numberOfShards;
}

/**
* The reshardSplitShardCount tells us weather requests are being routed to the source shard or
* to both source and target shards. Requests are routed to both source and target shards
* once the target shards are ready for an operation.
* @param shardId Input shardId for which we want to calculate the effective shard count
* @param minShardState Minimum target shard state required for
* @return Effective shard count as seen by an operation using this IndexMetadata
*/
public int getReshardSplitShardCount(int shardId, IndexReshardingState.Split.TargetShardState minShardState) {
assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds";
int shardCount = getNumberOfShards();
if (reshardingMetadata != null) {
if (reshardingMetadata.getSplit().isTargetShard(shardId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is being called for a target shard, does that mean that the coordinator has already determined that the target shard is ready?

Copy link
Contributor Author

@ankikuma ankikuma Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason is that we already have target state checks for routing requests at the coordinator level. The fact that a request is routed to the target shard means that it must be ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added more details to the method description

// TODO: Assert that target state is atleast minShardState
int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId);
assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state";
shardCount = reshardingMetadata.getSplit().shardCountAfter();
} else if (reshardingMetadata.getSplit().isSourceShard(shardId)) {
if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) {
shardCount = reshardingMetadata.getSplit().shardCountAfter();
} else {
shardCount = reshardingMetadata.getSplit().shardCountBefore();
}
}
}
return shardCount;
}

public int getNumberOfReplicas() {
return numberOfReplicas;
}
Expand Down Expand Up @@ -2985,7 +3013,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException {
* Returns the number of shards that should be used for routing. This basically defines the hash space we use in
* {@link IndexRouting#indexShard} to route documents
* to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only
* changes if and index is shrunk.
* changes if an index is shrunk.
*/
public int getRoutingNumShards() {
return routingNumShards;
Expand All @@ -3005,7 +3033,7 @@ public int getRoutingFactor() {
* @param shardId the id of the target shard to split into
* @param sourceIndexMetadata the source index metadata
* @param numTargetShards the total number of shards in the target index
* @return a the source shard ID to split off from
* @return the source shard ID to split off from
*/
public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) {
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ public boolean targetStateAtLeast(int shardNum, TargetShardState targetShardStat
return getTargetShardState(shardNum).ordinal() >= targetShardState.ordinal();
}

public boolean allTargetStatesAtLeast(int sourceShardId, TargetShardState targetShardState) {
var targets = getTargetStatesFor(sourceShardId);
for (TargetShardState state : targets) {
if (state.ordinal() < targetShardState.ordinal()) {
return false;
}
}
return true;
}

public Stream<TargetShardState> targetStates() {
return Arrays.stream(targetShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {
protected final String indexName;
private final int routingNumShards;
private final int routingFactor;
@Nullable
private final IndexReshardingMetadata indexReshardingMetadata;

private IndexRouting(IndexMetadata metadata) {
Expand Down
Loading