Skip to content

Conversation

ankikuma
Copy link
Contributor

@ankikuma ankikuma commented Sep 2, 2025

Add reshard count - effective shard count as seen by the coordination node - to the replication request. Note that this new field has not been serialized over the network yet.

@elasticsearchmachine elasticsearchmachine added v9.2.0 serverless-linked Added by automation, don't add manually labels Sep 2, 2025
@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Sep 16, 2025
@ankikuma ankikuma added the needs:triage Requires assignment of a team area label label Sep 16, 2025
@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Sep 16, 2025
@ankikuma ankikuma added :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. needs:triage Requires assignment of a team area label and removed :Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. labels Sep 16, 2025
@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Sep 16, 2025
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
* node with that visible at the source shard node. (w.r.t resharding).
* Note that we are able to get away with a single number, instead of an array of target shard states,
* because we only allow splits in increments of 2x.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to want concrete examples here. I know this makes for a long comment, but I think the field is unintuitive enough to warrant it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that examples will be helpful. Its semantics really is confusing.

Also, I was thinking if I rename the field in the ReplicationRequest to reshardSplitExpectedShardCount, will that be better ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we referred to this value as a "checksum of resharding state" in the design document. I wonder if calling it some kind of checksum will resolve the naming dispute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I like reshardSplitShardCountChecksum. Because otherwise it sounds like it's a count that you can actually use for something. It's really just a value to be used not as is, but in the context of reconciling cluster state between 2 different nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshedding, how about "summary" instead of "checksum"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of like checksum better but happy change it ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I won't push hard on this. To me checksum implies an integrity check - you have some data, and alongside it you have a provided checksum, and if you recompute the checksum and it doesn't match the provided checksum you have an integrity problem. It sort of applies here - we have our own view of the routing table, and the coordinator provides a "checksum", and if we don't produce the same value over our view then we don't trust the coordinator's binning (which kind of looks like a response to an integrity error if you squint).

I felt that summary (i.e., a lossily compressed form that still has relevant info) was maybe a little more accurate, but I fully admit I'm bikeshedding. Carry on as you see fit.

final ProjectId projectId = clusterState.metadata().projectFor(index).id();
final ProjectMetadata project = clusterState.metadata().projectFor(index);
final ProjectId projectId = project.id();
final IndexMetadata indexMetadata = project.index(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be null? I know we have a shard reference here but I'm not sure whether that ensures that an index isn't deleted in cluster state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Index delete/close are supposed to acquire all permits before proceeding. I think TransportVerifyShardBeforeCloseAction#acquirePrimaryOperationPermit is responsible for this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds";
int shardCount = getNumberOfShards();
if (reshardingMetadata != null) {
if (reshardingMetadata.getSplit().isTargetShard(shardId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is being called for a target shard, does that mean that the coordinator has already determined that the target shard is ready?

Copy link
Contributor Author

@ankikuma ankikuma Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason is that we already have target state checks for routing requests at the coordinator level. The fact that a request is routed to the target shard means that it must be ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added more details to the method description

Comment on lines 136 to 139
assertThat(
IndexMetadataAfterReshard.getReshardSplitShardCount(i, IndexReshardingState.Split.TargetShardState.CLONE),
equalTo(numTargetShards)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this state feels like it should be illegal. The query doesn't really make sense, since a shard in CLONE state is by definition not ready to accept operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getReshardSplitShardCount is agnostic to the semantics of the operation. Given an input shard and a target shard state (that which is required for targets to be ready for the operation), it returns the "ReshardSplitShardCount" observed based on the IndexMetadata. For example, search will pass in the required state as SPLIT while indexing will pass in HANDOFF.
I will try to make it more clear in the API comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I think it would be nice if the public API were harder to misuse. My feeling is that supplying CLONE to this function is an indication of a logical mistake, so we shouldn't allow it. One way we could do that is to make getReshardSplitShardCount private and have two wrappers, e.g., getReshardSplitShardCountForIndexing and getReshardSplitShardCountForSearch that call getReshardSplitShardCount with HANDOFF and SPLIT respectively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sure, that will make the code more readable too I think. We will need a wrapper for refresh and flush as well. I guess we can add wrappers as and when needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more like "index" and "search" were friendly names for HANDOFF and SPLIT. I'm not sure whether we need wrappers per operation - it seems like they would just be synonyms, and having multiple functions that do the same thing seems noisy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya we can just use these names or rename them later to something more generic

* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
* node with that visible at the source shard node. (w.r.t resharding).
* Note that we are able to get away with a single number, instead of an array of target shard states,
* because we only allow splits in increments of 2x.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we referred to this value as a "checksum of resharding state" in the design document. I wonder if calling it some kind of checksum will resolve the naming dispute.

this(shardId, 0, in);
}

public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCount, StreamInput in) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because some inheritors do not have shardId (i guess IndexRequest is an example)?

final var numSourceShards = 2;
indexMetadata = IndexMetadata.builder(indexMetadata).reshardAddShards(numSourceShards).build();

assertNull(reshardingMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to re-read reshardingMetadata here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

// starting state is as expected
assertEquals(numSourceShards, reshardingMetadata.shardCountBefore());
assertEquals(numSourceShards * multiple, reshardingMetadata.shardCountAfter());
final int numTargetShards = reshardingMetadata.shardCountAfter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a number of target shards, right? This is total.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I renamed this to numShardsAfterReshard so it's not confusing.


// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.index(shardId.getIndexName());
int reshardSplitShardCount = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checking: my understanding is this is always safe - 0 means that the receiving shard will always inspect the bulk item request and decide whether it needs to resplit, which may not be optimal but won't ever be wrong. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, actually I was thinking of 0 as being the case where we do not care what the reshardSplitShardCount is. This is so we don't have to change all the tests that were written pre-autoshardingtosharding.
In this particular instance, where we are creating the BulkShardRequest, it is 0 if the indexMatadata is null. But thinking about this now, I wonder if this could lead to wrong results. Perhaps I should reserve 0 for the case where we always inspect the request and modify the tests.

this(shardId, 0, in);
}

public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCount, StreamInput in) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've skimmed #56209 that introduced thin serialization, apparently to save redundant shard ID serialization since shard IDs are large. It looks like the expected case is that shard ID is not null so we can do the thin read, but that the request may override the shard ID even if it is not null (lines 113-121 below, on the right side of the diff).

I suppose it's the case that if we can omit the shard ID in serialization because we're getting it from somewhere else then we can probably also save 4 bytes per request on reshardSplitShardCount, so we can provide it to the constructor instead of reading it over the wire. But as I read the code below, that's not what we're doing. We're just using a provided value if the transport version isn't recent enough. I don't think that's a thin serialization concern. I think if we want to be thin, then we use the provided value and don't serialize it at all? Unless there's a case where we need to override the provided value, as with index. Then we need to do a boolean to signal the presence of the shard ID and I think the savings isn't worth it.

Overall, my feeling is that given this is only a 4 byte field, it would be simpler right now to not provide the shard count to the constructor and always deserialize it if the transport version is new enough, or supply 0 in line if it isn't. Thoughts?

final ProjectId projectId = clusterState.metadata().projectFor(index).id();
final ProjectMetadata project = clusterState.metadata().projectFor(index);
final ProjectId projectId = project.id();
final IndexMetadata indexMetadata = project.index(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

Comment on lines 136 to 139
assertThat(
IndexMetadataAfterReshard.getReshardSplitShardCount(i, IndexReshardingState.Split.TargetShardState.CLONE),
equalTo(numTargetShards)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I think it would be nice if the public API were harder to misuse. My feeling is that supplying CLONE to this function is an indication of a logical mistake, so we shouldn't allow it. One way we could do that is to make getReshardSplitShardCount private and have two wrappers, e.g., getReshardSplitShardCountForIndexing and getReshardSplitShardCountForSearch that call getReshardSplitShardCount with HANDOFF and SPLIT respectively.

* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public int getReshardSplitShardCountChecksumForIndexing(int shardId) {
return (getReshardSplitShardCountChecksum(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit but seems like there's extra parentheses? Also for getReshardSplitShardCountChecksumForSearch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will fix it.

Copy link
Contributor

@bcully bcully left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one question on this version

Comment on lines 473 to 475
assert (reshardSplitShardCountChecksum == 0
|| reshardSplitShardCountChecksum == indexMetadata.getReshardSplitShardCountChecksumForIndexing(
primaryRequest.getRequest().shardId().getId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being executed on the source shard? If so, could this assertion fire when the coordinator's checksum is stale vs the source?

@ankikuma ankikuma merged commit f38f4ad into elastic:main Sep 25, 2025
34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. >non-issue serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants