Skip to content

Conversation

Tim-Brooks
Copy link
Contributor

This commit adds the logic to delegate bulk shard requests to the split
target when a primary receives a request from a stale coordinator.

ankikuma and others added 28 commits September 17, 2025 17:13
…com:ankikuma/elasticsearch into 09162025/ReshardSplitRequestOnSourceTwoPass

merged
…com:ankikuma/elasticsearch into 09162025/ReshardSplitRequestOnSourceTwoPass

pull
…com:ankikuma/elasticsearch into 09162025/ReshardSplitRequestOnSourceTwoPass

Pull
…com:ankikuma/elasticsearch into ankikuma-09162025/ReshardSplitRequestOnSourceTwoPass
@Tim-Brooks Tim-Brooks added >non-issue :Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. labels Oct 9, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Indexing Meta label for Distributed Indexing team label Oct 9, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@elasticsearchmachine elasticsearchmachine added the serverless-linked Added by automation, don't add manually label Oct 9, 2025
public static Tuple<BulkShardResponse, Exception> combineResponses(
BulkShardRequest originalRequest,
Map<ShardId, BulkShardRequest> splitRequests,
Map<ShardId, Tuple<BulkShardResponse, Exception>> responses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need Either type :)

@ankikuma
Copy link
Contributor

LGTM

Copy link
Contributor

@bcully bcully left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I had a few questions/suggestions but nothing major. We probably want some more end-to-end ITs when this lands.


private ShardBulkSplitHelper() {}

public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest request, ProjectMetadata project) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document this a bit? Like I'm assuming that the caller has blocked handoff here by taking permits. It looks like there's also an expectation that the caller won't call this function unless it has determined that the coordinator's shard summary doesn't match the shard's, so it doesn't need to fast-path for that.
And there looks like there's an expectation on the caller that if the request has no items, an entry for the source shard with no items will be returned, but that otherwise this function should not generate empty sub-requests (e.g., if it created a map eagerly for both source and target that would be a bug).

}
}
BulkShardResponse bulkShardResponse = new BulkShardResponse(originalRequest.shardId(), bulkItemResponses);
// TODO: Decide how to handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you looked at consumers of this? If so could you leave a breadcrumb to your investigation? If not, we should ticket that and link the ticket here.

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

public class ReplicationSplitHelper<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have a little javadoc explaining what this class is for if you get a chance

new ShardId(index, newShardId),
shardNum -> new ArrayList<>()
);
shardRequests.add(new BulkItemRequest(bulkItemRequest.id(), bulkItemRequest.request()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assert anything about the shard id in bulkItemRequest.request()? From skimming code I think it's probably null (thin serialization) but I don't know if it always is, or how it would be used if it's not null.

*/
int route(IndexRouting indexRouting);

int rerouteAtSourceDuringResharding(IndexRouting indexRouting);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc comment? I think they're nice for interface/abstract methods.

*/
public abstract int indexShard(IndexRequest indexRequest);

public abstract int rerouteToTarget(IndexRequest indexRequest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc comment? :)

}
return indexShard(indexRequest);
} else if (addIdWithRoutingHash) {
// TODO: is this correct?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're probably going to have to generate test cases for tsdb/logsdb

IndexMetadata indexMetadata = IndexMetadata.builder(indexName).settings(settings).build();
indexMetadata = IndexMetadata.builder(indexMetadata).reshardAddShards(2).build();

SplitShardCountSummary staleSummary = SplitShardCountSummary.fromInt(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally I'd prefer to generate this from a 1 shard metadata instead of assuming the serialization meaning, but I suppose if we changed serialization we'd notice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. >non-issue serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants