Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
53a338d
Split requestwq
ankikuma Sep 17, 2025
064f3ec
commit
ankikuma Sep 23, 2025
984063e
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 23, 2025
b93104a
es
ankikuma Sep 23, 2025
9b79168
fix reroute at source
ankikuma Sep 24, 2025
12e46af
[CI] Auto commit changes from spotless
Sep 24, 2025
770e0f1
fix reroute at source bugs
ankikuma Sep 24, 2025
3e72e7a
Merge branch '09162025/ReshardSplitRequestOnSourceTwoPass' of github.…
ankikuma Sep 24, 2025
0cf593c
[CI] Update transport version definitions
Sep 24, 2025
826d11a
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 24, 2025
a793d5c
Merge branch '09162025/ReshardSplitRequestOnSourceTwoPass' of github.…
ankikuma Sep 24, 2025
b153030
refresh
ankikuma Sep 25, 2025
0dc91f1
commit
ankikuma Sep 25, 2025
9dc5079
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 25, 2025
a9c9885
fix reroute logic
ankikuma Sep 25, 2025
c2c5490
spotless
ankikuma Sep 25, 2025
df26a61
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 25, 2025
636225d
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 29, 2025
4584a23
commit
ankikuma Sep 29, 2025
c9c32d8
[CI] Auto commit changes from spotless
Sep 30, 2025
fe1d3c3
commit
ankikuma Sep 30, 2025
fd43a75
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 30, 2025
7aca078
Merge branch '09162025/ReshardSplitRequestOnSourceTwoPass' of github.…
ankikuma Sep 30, 2025
f1c4b2d
Merge remote-tracking branch 'upstream/main' into 09162025/ReshardSpl…
ankikuma Sep 30, 2025
0c16c45
Merge branch '09162025/ReshardSplitRequestOnSourceTwoPass' of github.…
Tim-Brooks Oct 7, 2025
5fcc140
Changes
Tim-Brooks Oct 7, 2025
c14623a
Change
Tim-Brooks Oct 8, 2025
9946884
Change
Tim-Brooks Oct 9, 2025
646bdbe
Changes
Tim-Brooks Oct 9, 2025
7c5235a
Merge remote-tracking branch 'origin/main' into ankikuma-09162025/Res…
Tim-Brooks Oct 9, 2025
701d19c
Change
Tim-Brooks Oct 9, 2025
ed015ee
Merge remote-tracking branch 'origin/main' into ankikuma-09162025/Res…
Tim-Brooks Oct 10, 2025
d3648e9
[CI] Auto commit changes from spotless
Oct 10, 2025
5ba4e73
Change
Tim-Brooks Oct 13, 2025
aa8505e
Merge remote-tracking branch 'origin/main' into ankikuma-09162025/Res…
Tim-Brooks Oct 13, 2025
dc0fb02
Change
Tim-Brooks Oct 13, 2025
6cc838c
Merge remote-tracking branch 'origin/main' into ankikuma-09162025/Res…
Tim-Brooks Oct 14, 2025
292f8cc
Fix
Tim-Brooks Oct 14, 2025
83dbaf6
Fix
Tim-Brooks Oct 16, 2025
2ba6abd
Merge remote-tracking branch 'origin/main' into ankikuma-09162025/Res…
Tim-Brooks Oct 16, 2025
4ebc4d0
Change
Tim-Brooks Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ default void postRoutingProcess(IndexRouting indexRouting) {}
*/
int route(IndexRouting indexRouting);

/**
* Pick the appropriate target shard id this request should be routed to during resharding.
*/
int rerouteAtSourceDuringResharding(IndexRouting indexRouting);

/**
* Resolves the write index that should receive this request
* based on the provided index abstraction.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public final class ShardBulkSplitHelper {

private ShardBulkSplitHelper() {}

/**
* Splits a bulk request into multiple requests for each shard. If the items in the request only route to the source shard it will
* return the original request. If the items only route to the target shard it will return a map with one request. If the requests
* route to both the map will have a request for each shard.
*/
public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest request, ProjectMetadata project) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document this a bit? Like I'm assuming that the caller has blocked handoff here by taking permits. It looks like there's also an expectation that the caller won't call this function unless it has determined that the coordinator's shard summary doesn't match the shard's, so it doesn't need to fast-path for that.
And there looks like there's an expectation on the caller that if the request has no items, an entry for the source shard with no items will be returned, but that otherwise this function should not generate empty sub-requests (e.g., if it created a map eagerly for both source and target that would be a bug).

final ShardId sourceShardId = request.shardId();
final Index index = sourceShardId.getIndex();
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(project.getIndexSafe(index));

Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
Map<ShardId, BulkShardRequest> bulkRequestsPerShard = new HashMap<>();

// Iterate through the items in the input request and split them based on the
// current resharding-split state.
BulkItemRequest[] items = request.items();
if (items.length == 0) { // Nothing to split
return Map.of(sourceShardId, request);
}

for (BulkItemRequest bulkItemRequest : items) {
DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
int newShardId = docWriteRequest.rerouteAtSourceDuringResharding(indexRouting);
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
new ShardId(index, newShardId),
shardNum -> new ArrayList<>()
);
shardRequests.add(new BulkItemRequest(bulkItemRequest.id(), bulkItemRequest.request()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assert anything about the shard id in bulkItemRequest.request()? From skimming code I think it's probably null (thin serialization) but I don't know if it always is, or how it would be used if it's not null.

}

// All items belong to either the source shard or target shard.
if (requestsByShard.size() == 1) {
// Return the original request if no items were split to target.
if (requestsByShard.containsKey(sourceShardId)) {
return Map.of(sourceShardId, request);
}
}

for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
request.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[0]),
request.isSimulated()
);
bulkRequestsPerShard.put(shardId, bulkShardRequest);
}
return bulkRequestsPerShard;
}

public static Tuple<BulkShardResponse, Exception> combineResponses(
BulkShardRequest originalRequest,
Map<ShardId, BulkShardRequest> splitRequests,
Map<ShardId, Tuple<BulkShardResponse, Exception>> responses
) {
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[originalRequest.items().length];
for (Map.Entry<ShardId, Tuple<BulkShardResponse, Exception>> entry : responses.entrySet()) {
ShardId shardId = entry.getKey();
Tuple<BulkShardResponse, Exception> value = entry.getValue();
Exception exception = value.v2();
if (exception != null) {
BulkShardRequest bulkShardRequest = splitRequests.get(shardId);
for (BulkItemRequest item : bulkShardRequest.items()) {
DocWriteRequest<?> request = item.request();
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(item.index(), request.id(), exception);
bulkItemResponses[item.id()] = BulkItemResponse.failure(item.id(), request.opType(), failure);
}
} else {
for (BulkItemResponse bulkItemResponse : value.v1().getResponses()) {
bulkItemResponses[bulkItemResponse.getItemId()] = bulkItemResponse;
}
}
}
BulkShardResponse bulkShardResponse = new BulkShardResponse(originalRequest.shardId(), bulkItemResponses);
// TODO: Decide how to handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you looked at consumers of this? If so could you leave a breadcrumb to your investigation? If not, we should ticket that and link the ticket here.

bulkShardResponse.setShardInfo(responses.get(originalRequest.shardId()).v1().getShardInfo());
return new Tuple<>(bulkShardResponse, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -163,6 +164,20 @@ protected void shardOperationOnPrimary(
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)), true);
}

@Override
protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest request) {
return ShardBulkSplitHelper.splitRequests(request, projectResolver.getProjectMetadata(clusterService.state()));
}

@Override
protected Tuple<BulkShardResponse, Exception> combineSplitResponses(
BulkShardRequest originalRequest,
Map<ShardId, BulkShardRequest> splitRequests,
Map<ShardId, Tuple<BulkShardResponse, Exception>> responses
) {
return ShardBulkSplitHelper.combineResponses(originalRequest, splitRequests, responses);
}

@Override
protected void dispatchedShardOperationOnPrimary(
BulkShardRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ public int route(IndexRouting indexRouting) {
return indexRouting.deleteShard(id, routing);
}

@Override
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
return indexRouting.deleteShard(id, routing);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,11 @@ public int route(IndexRouting indexRouting) {
return indexRouting.indexShard(this);
}

@Override
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
return indexRouting.rerouteToTarget(this);
}

public IndexRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* which it may be necessary to use the test utility {@code MasterNodeRequestHelper#unwrapTermOverride} to remove the wrapper and access the
* inner request.
*/
class TermOverridingMasterNodeRequest extends AbstractTransportRequest {
public class TermOverridingMasterNodeRequest extends AbstractTransportRequest {

private static final Logger logger = LogManager.getLogger(TermOverridingMasterNodeRequest.class);

Expand Down
Loading