Skip to content

Commit 5fcc140

Browse files
committed
Changes
1 parent 0c16c45 commit 5fcc140

File tree

6 files changed

+46
-40
lines changed

6 files changed

+46
-40
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -172,52 +172,40 @@ protected void shardOperationOnPrimary(
172172

173173
@Override
174174
protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest request) {
175-
// System.out.println("I am splitting");
175+
final ShardId sourceShardId = request.shardId();
176+
final Index index = sourceShardId.getIndex();
177+
176178
ClusterState clusterState = clusterService.state();
177179
ProjectMetadata project = projectResolver.getProjectMetadata(clusterState);
178-
Index index = request.shardId().getIndex();
179-
// IndexMetadata indexMetadata = clusterState.getMetadata().indexMetadata(index);
180-
IndexRouting routing = IndexRouting.fromIndexMetadata(project.getIndexSafe(index));
180+
181+
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(project.getIndexSafe(index));
181182
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
182183
Map<ShardId, BulkShardRequest> bulkRequestsPerShard = new HashMap<>();
183184

184185
// Iterate through the items in the input request and split them based on the
185186
// current resharding-split state.
186187
BulkItemRequest[] items = request.items();
187188
if (items.length == 0) { // Nothing to split
188-
return Map.of(request.shardId(), request);
189+
return Map.of(sourceShardId, request);
189190
}
190191

191192
for (int i = 0; i < items.length; i++) {
192193
BulkItemRequest bulkItemRequest = items[i];
193194
DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
194-
int shardId = docWriteRequest.rerouteAtSourceDuringResharding(routing);
195-
// int shardId = docWriteRequest.route(routing);
196-
// System.out.println("shardId = " + shardId);
195+
int newShardId = docWriteRequest.rerouteAtSourceDuringResharding(indexRouting);
197196
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
198-
new ShardId(index, shardId),
197+
new ShardId(index, newShardId),
199198
shardNum -> new ArrayList<>()
200199
);
201-
shardRequests.add(bulkItemRequest);
200+
shardRequests.add(new BulkItemRequest(newShardId, bulkItemRequest.request()));
202201
}
203202

204-
// System.out.println("requestsByShard = " + requestsByShard.size());
205203
// All items belong to either the source shard or target shard.
206204
if (requestsByShard.size() == 1) {
207-
ShardId targetShard = requestsByShard.entrySet().iterator().next().getKey();
208-
// Return original request if no items were split to target.
209-
if (targetShard.equals(request.shardId())) {
210-
return Map.of(request.shardId(), request);
211-
} else {
212-
// Create new bulk request that is identical to the original request except the shardId.
213-
// TODO: Verify that this is alright because each BulkItemRequest also contains shardId
214-
BulkShardRequest bulkShardRequest = new BulkShardRequest(
215-
targetShard,
216-
request.getRefreshPolicy(),
217-
request.items(),
218-
request.isSimulated()
219-
);
220-
return Map.of(targetShard, bulkShardRequest);
205+
Map.Entry<ShardId, List<BulkItemRequest>> entry = requestsByShard.entrySet().iterator().next();
206+
// Return the original request if no items were split to target.
207+
if (entry.getKey().equals(sourceShardId)) {
208+
return Map.of(sourceShardId, request);
221209
}
222210
}
223211

@@ -232,7 +220,7 @@ protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest
232220
);
233221
bulkRequestsPerShard.put(shardId, bulkShardRequest);
234222
}
235-
return (bulkRequestsPerShard);
223+
return bulkRequestsPerShard;
236224
}
237225

238226
@Override

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public int route(IndexRouting indexRouting) {
238238

239239
@Override
240240
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
241-
return indexRouting.rerouteDeleteRequestIfResharding(id, routing);
241+
return indexRouting.deleteShard(id, routing);
242242
}
243243

244244
@Override

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ public int route(IndexRouting indexRouting) {
930930

931931
@Override
932932
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
933-
return indexRouting.rerouteIndexingRequestIfResharding(this);
933+
return indexRouting.rerouteToTarget(this);
934934
}
935935

936936
public IndexRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,6 @@ public void handleException(TransportException exp) {
527527
assert numSplitRequests > 0 : "expected atleast 1 split request";
528528
assert numSplitRequests <= 2 : "number of split requests too many";
529529

530-
// System.out.println("numSplitRequests = " + numSplitRequests);
531-
// System.out.println("source shardId = " + primaryRequest.getRequest().shardId().toString());
532530
if (numSplitRequests == 1) {
533531
// System.out.println("shardId = " + splitRequests.entrySet().iterator().next().getKey().toString());
534532
// If the request is for source, same behaviour as before

server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ public int route(IndexRouting indexRouting) {
690690

691691
@Override
692692
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
693-
return indexRouting.rerouteUpdateRequestIfResharding(id, routing);
693+
return indexRouting.updateShard(id, routing);
694694
}
695695

696696
public UpdateRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public void postProcess(IndexRequest indexRequest) {}
9898
*/
9999
public abstract int indexShard(IndexRequest indexRequest);
100100

101+
public abstract int rerouteToTarget(IndexRequest indexRequest);
102+
101103
/**
102104
* Called when updating a document to generate the shard id that should contain
103105
* a document with the provided {@code _id} and (optional) {@code _routing}.
@@ -233,9 +235,13 @@ public int indexShard(IndexRequest indexRequest) {
233235
return rerouteWritesIfResharding(shardId);
234236
}
235237

238+
@Override
239+
public int rerouteToTarget(IndexRequest indexRequest) {
240+
return indexShard(indexRequest);
241+
}
242+
236243
@Override
237244
public int rerouteIndexingRequestIfResharding(IndexRequest indexRequest) {
238-
// System.out.println("Route based on Id");
239245
String id = indexRequest.id();
240246
String routing = indexRequest.routing();
241247
if (id == null) {
@@ -370,13 +376,29 @@ public void postProcess(IndexRequest indexRequest) {
370376

371377
@Override
372378
public int indexShard(IndexRequest indexRequest) {
373-
// System.out.println("Extract from source");
374379
assert Transports.assertNotTransportThread("parsing the _source can get slow");
375380
checkNoRouting(indexRequest.routing());
376381
hash = hashSource(indexRequest);
377382
int shardId = hashToShardId(hash);
378-
// System.out.println("shardId = " + shardId);
379-
return (rerouteWritesIfResharding(shardId));
383+
return rerouteWritesIfResharding(shardId);
384+
}
385+
386+
@Override
387+
public int rerouteToTarget(IndexRequest indexRequest) {
388+
if (trackTimeSeriesRoutingHash) {
389+
String routing = indexRequest.routing();
390+
if (routing == null) {
391+
throw new IllegalStateException("Routing should be set by the coordinator");
392+
}
393+
TimeSeriesRoutingHashFieldMapper.decode(routing);
394+
return indexShard(indexRequest);
395+
} else if (addIdWithRoutingHash) {
396+
// TODO: is this correct?
397+
return hashToShardId(effectiveRoutingToHash(indexRequest.id()));
398+
} else {
399+
checkNoRouting(indexRequest.routing());
400+
return indexShard(indexRequest);
401+
}
380402
}
381403

382404
protected abstract int hashSource(IndexRequest indexRequest);
@@ -386,15 +408,13 @@ public int indexShard(IndexRequest indexRequest) {
386408
// make this call cheaper.
387409
@Override
388410
public int rerouteIndexingRequestIfResharding(IndexRequest indexRequest) {
389-
// System.out.println("Extract from source");
390411
// assert Transports.assertNotTransportThread("parsing the _source can get slow");
391412
// TODO: Is this always necessary ? This can be expensive. postProcess adds some additional metadata
392413
// TODO: to the indexing request, can that be used to get the hash in a cheaper way ? Or maybe we
393414
// TODO: can add the hash to the IndexRequest ?
394415
hash = hashSource(indexRequest);
395416
int shardId = hashToShardId(hash);
396-
// System.out.println("shardId = " + shardId);
397-
return (rerouteWritesIfResharding(shardId));
417+
return rerouteWritesIfResharding(shardId);
398418
}
399419

400420
private static int defaultOnEmpty() {
@@ -419,14 +439,14 @@ public int rerouteUpdateRequestIfResharding(String id, @Nullable String routing)
419439
public int deleteShard(String id, @Nullable String routing) {
420440
checkNoRouting(routing);
421441
int shardId = idToHash(id);
422-
return (rerouteWritesIfResharding(shardId));
442+
return rerouteWritesIfResharding(shardId);
423443
}
424444

425445
@Override
426446
public int rerouteDeleteRequestIfResharding(String id, @Nullable String routing) {
427447
checkNoRouting(routing);
428448
int shardId = idToHash(id);
429-
return (rerouteWritesIfResharding(shardId));
449+
return rerouteWritesIfResharding(shardId);
430450
}
431451

432452
@Override

0 commit comments

Comments
 (0)