Skip to content

Commit e22e908

Browse files
authored
Add support for delegating write to split target (#136241)
This commit adds the logic to delegate bulk shard requests to the split target when a primary receives a request from a stale coordinator.
1 parent ad70045 commit e22e908

File tree

13 files changed

+1204
-72
lines changed

13 files changed

+1204
-72
lines changed

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ default void postRoutingProcess(IndexRouting indexRouting) {}
176176
*/
177177
int route(IndexRouting indexRouting);
178178

179+
/**
180+
* Pick the appropriate target shard id this request should be routed to during resharding.
181+
*/
182+
int rerouteAtSourceDuringResharding(IndexRouting indexRouting);
183+
179184
/**
180185
* Resolves the write index that should receive this request
181186
* based on the provided index abstraction.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.bulk;
11+
12+
import org.elasticsearch.action.DocWriteRequest;
13+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
14+
import org.elasticsearch.cluster.routing.IndexRouting;
15+
import org.elasticsearch.core.Tuple;
16+
import org.elasticsearch.index.Index;
17+
import org.elasticsearch.index.shard.ShardId;
18+
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
public final class ShardBulkSplitHelper {
25+
26+
private ShardBulkSplitHelper() {}
27+
28+
/**
29+
* Splits a bulk request into multiple requests for each shard. If the items in the request only route to the source shard it will
30+
* return the original request. If the items only route to the target shard it will return a map with one request. If the requests
31+
* route to both the map will have a request for each shard.
32+
*/
33+
public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest request, ProjectMetadata project) {
34+
final ShardId sourceShardId = request.shardId();
35+
final Index index = sourceShardId.getIndex();
36+
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(project.getIndexSafe(index));
37+
38+
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
39+
Map<ShardId, BulkShardRequest> bulkRequestsPerShard = new HashMap<>();
40+
41+
// Iterate through the items in the input request and split them based on the
42+
// current resharding-split state.
43+
BulkItemRequest[] items = request.items();
44+
if (items.length == 0) { // Nothing to split
45+
return Map.of(sourceShardId, request);
46+
}
47+
48+
for (BulkItemRequest bulkItemRequest : items) {
49+
DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
50+
int newShardId = docWriteRequest.rerouteAtSourceDuringResharding(indexRouting);
51+
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
52+
new ShardId(index, newShardId),
53+
shardNum -> new ArrayList<>()
54+
);
55+
shardRequests.add(new BulkItemRequest(bulkItemRequest.id(), bulkItemRequest.request()));
56+
}
57+
58+
// All items belong to either the source shard or target shard.
59+
if (requestsByShard.size() == 1) {
60+
// Return the original request if no items were split to target.
61+
if (requestsByShard.containsKey(sourceShardId)) {
62+
return Map.of(sourceShardId, request);
63+
}
64+
}
65+
66+
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
67+
final ShardId shardId = entry.getKey();
68+
final List<BulkItemRequest> requests = entry.getValue();
69+
BulkShardRequest bulkShardRequest = new BulkShardRequest(
70+
shardId,
71+
request.getRefreshPolicy(),
72+
requests.toArray(new BulkItemRequest[0]),
73+
request.isSimulated()
74+
);
75+
bulkRequestsPerShard.put(shardId, bulkShardRequest);
76+
}
77+
return bulkRequestsPerShard;
78+
}
79+
80+
public static Tuple<BulkShardResponse, Exception> combineResponses(
81+
BulkShardRequest originalRequest,
82+
Map<ShardId, BulkShardRequest> splitRequests,
83+
Map<ShardId, Tuple<BulkShardResponse, Exception>> responses
84+
) {
85+
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[originalRequest.items().length];
86+
for (Map.Entry<ShardId, Tuple<BulkShardResponse, Exception>> entry : responses.entrySet()) {
87+
ShardId shardId = entry.getKey();
88+
Tuple<BulkShardResponse, Exception> value = entry.getValue();
89+
Exception exception = value.v2();
90+
if (exception != null) {
91+
BulkShardRequest bulkShardRequest = splitRequests.get(shardId);
92+
for (BulkItemRequest item : bulkShardRequest.items()) {
93+
DocWriteRequest<?> request = item.request();
94+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(item.index(), request.id(), exception);
95+
bulkItemResponses[item.id()] = BulkItemResponse.failure(item.id(), request.opType(), failure);
96+
}
97+
} else {
98+
for (BulkItemResponse bulkItemResponse : value.v1().getResponses()) {
99+
bulkItemResponses[bulkItemResponse.getItemId()] = bulkItemResponse;
100+
}
101+
}
102+
}
103+
BulkShardResponse bulkShardResponse = new BulkShardResponse(originalRequest.shardId(), bulkItemResponses);
104+
// TODO: Decide how to handle
105+
bulkShardResponse.setShardInfo(responses.get(originalRequest.shardId()).v1().getShardInfo());
106+
return new Tuple<>(bulkShardResponse, null);
107+
}
108+
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.index.mapper.SourceToParse;
5656
import org.elasticsearch.index.seqno.SequenceNumbers;
5757
import org.elasticsearch.index.shard.IndexShard;
58+
import org.elasticsearch.index.shard.ShardId;
5859
import org.elasticsearch.index.translog.Translog;
5960
import org.elasticsearch.indices.ExecutorSelector;
6061
import org.elasticsearch.indices.IndicesService;
@@ -163,6 +164,20 @@ protected void shardOperationOnPrimary(
163164
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)), true);
164165
}
165166

167+
@Override
168+
protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest request) {
169+
return ShardBulkSplitHelper.splitRequests(request, projectResolver.getProjectMetadata(clusterService.state()));
170+
}
171+
172+
@Override
173+
protected Tuple<BulkShardResponse, Exception> combineSplitResponses(
174+
BulkShardRequest originalRequest,
175+
Map<ShardId, BulkShardRequest> splitRequests,
176+
Map<ShardId, Tuple<BulkShardResponse, Exception>> responses
177+
) {
178+
return ShardBulkSplitHelper.combineResponses(originalRequest, splitRequests, responses);
179+
}
180+
166181
@Override
167182
protected void dispatchedShardOperationOnPrimary(
168183
BulkShardRequest request,

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ public int route(IndexRouting indexRouting) {
236236
return indexRouting.deleteShard(id, routing);
237237
}
238238

239+
@Override
240+
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
241+
return indexRouting.deleteShard(id, routing);
242+
}
243+
239244
@Override
240245
public void writeTo(StreamOutput out) throws IOException {
241246
super.writeTo(out);

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,11 @@ public int route(IndexRouting indexRouting) {
928928
return indexRouting.indexShard(this);
929929
}
930930

931+
@Override
932+
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
933+
return indexRouting.rerouteToTarget(this);
934+
}
935+
931936
public IndexRequest setRequireAlias(boolean requireAlias) {
932937
this.requireAlias = requireAlias;
933938
return this;

server/src/main/java/org/elasticsearch/action/support/master/TermOverridingMasterNodeRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* which it may be necessary to use the test utility {@code MasterNodeRequestHelper#unwrapTermOverride} to remove the wrapper and access the
3434
* inner request.
3535
*/
36-
class TermOverridingMasterNodeRequest extends AbstractTransportRequest {
36+
public class TermOverridingMasterNodeRequest extends AbstractTransportRequest {
3737

3838
private static final Logger logger = LogManager.getLogger(TermOverridingMasterNodeRequest.class);
3939

0 commit comments

Comments
 (0)