Skip to content

Commit a9c9885

Browse files
committed
fix reroute logic
1 parent 9dc5079 commit a9c9885

File tree

3 files changed

+19
-38
lines changed

3 files changed

+19
-38
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest
191191
for (int i = 0; i < items.length; i++) {
192192
BulkItemRequest bulkItemRequest = items[i];
193193
DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
194-
// int shardId = docWriteRequest.rerouteAtSourceDuringResharding(routing);
195-
int shardId = docWriteRequest.route(routing);
194+
int shardId = docWriteRequest.rerouteAtSourceDuringResharding(routing);
195+
// int shardId = docWriteRequest.route(routing);
196196
// System.out.println("shardId = " + shardId);
197197
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
198198
new ShardId(index, shardId),

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -928,7 +928,7 @@ public int route(IndexRouting indexRouting) {
928928

929929
@Override
930930
public int rerouteAtSourceDuringResharding(IndexRouting indexRouting) {
931-
return indexRouting.rerouteIndexingRequestIfResharding(id, routing, tsid, indexSource.contentType(), indexSource.bytes());
931+
return indexRouting.rerouteIndexingRequestIfResharding(this);
932932
}
933933

934934
public IndexRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,7 @@ private static int effectiveRoutingToHash(String effectiveRouting) {
151151
*/
152152
public void checkIndexSplitAllowed() {}
153153

154-
public abstract int rerouteIndexingRequestIfResharding(
155-
String id,
156-
@Nullable String routing,
157-
BytesRef tsid,
158-
XContentType sourceType,
159-
BytesReference source
160-
);
154+
public abstract int rerouteIndexingRequestIfResharding(IndexRequest indexRequest);
161155

162156
public abstract int rerouteDeleteRequestIfResharding(String id, @Nullable String routing);
163157

@@ -241,13 +235,11 @@ public int indexShard(IndexRequest indexRequest) {
241235

242236
@Override
243237
public int rerouteIndexingRequestIfResharding(
244-
String id,
245-
@Nullable String routing,
246-
BytesRef tsid,
247-
XContentType sourceType,
248-
BytesReference source
238+
IndexRequest indexRequest
249239
) {
250240
// System.out.println("Route based on Id");
241+
String id = indexRequest.id();
242+
String routing = indexRequest.routing();
251243
if (id == null) {
252244
throw new IllegalStateException("id is required and should have been set by process");
253245
}
@@ -380,41 +372,28 @@ public void postProcess(IndexRequest indexRequest) {
380372

381373
@Override
382374
public int indexShard(IndexRequest indexRequest) {
375+
// System.out.println("Extract from source");
383376
assert Transports.assertNotTransportThread("parsing the _source can get slow");
384377
checkNoRouting(indexRequest.routing());
385378
hash = hashSource(indexRequest);
386379
int shardId = hashToShardId(hash);
380+
// System.out.println("shardId = " + shardId);
387381
return (rerouteWritesIfResharding(shardId));
388382
}
389383

390384
protected abstract int hashSource(IndexRequest indexRequest);
391385

392386
@Override
393-
public int rerouteIndexingRequestIfResharding(
394-
String id,
395-
@Nullable String routing,
396-
BytesRef tsid,
397-
XContentType sourceType,
398-
BytesReference source
399-
) {
387+
public int rerouteIndexingRequestIfResharding(IndexRequest indexRequest) {
400388
// System.out.println("Extract from source");
401-
/*
402-
if (createTsidDuringRouting) {
403-
assert tsid != null : "expecting a valid tsid";
404-
hash = hash(tsid);
405-
} else {
406-
// TODO: Is this always necessary ? This can be expensive. We should not do this on a transport thread I believe.
407-
hash = hashRoutingFields(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
408-
}
409-
410-
*/
411-
hash = hash(tsid);
389+
// assert Transports.assertNotTransportThread("parsing the _source can get slow");
390+
// TODO: Is this always necessary ? This can be expensive. postProcess adds some additional metadata
391+
// TODO: to the indexing request, can that be used to get the hash in a cheaper way ? Or maybe we
392+
// TODO: can add the hash to the IndexRequest ?
393+
hash = hashSource(indexRequest);
412394
int shardId = hashToShardId(hash);
413-
return rerouteWritesIfResharding(shardId);
414-
}
415-
416-
public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
417-
return hashRoutingFields(sourceType, source).createId(suffix, IndexRouting.ExtractFromSource::defaultOnEmpty);
395+
// System.out.println("shardId = " + shardId);
396+
return (rerouteWritesIfResharding(shardId));
418397
}
419398

420399
private static int defaultOnEmpty() {
@@ -510,6 +489,7 @@ public static class ForRoutingPath extends ExtractFromSource {
510489

511490
@Override
512491
protected int hashSource(IndexRequest indexRequest) {
492+
// System.out.println("hashSource for routing path");
513493
return hashRoutingFields(indexRequest.getContentType(), indexRequest.source()).buildHash(
514494
IndexRouting.ExtractFromSource::defaultOnEmpty
515495
);
@@ -565,6 +545,7 @@ public static class ForIndexDimensions extends ExtractFromSource {
565545

566546
@Override
567547
protected int hashSource(IndexRequest indexRequest) {
548+
// System.out.println("hashSource for tsid");
568549
BytesRef tsid = indexRequest.tsid();
569550
if (tsid == null) {
570551
tsid = buildTsid(indexRequest.getContentType(), indexRequest.indexSource().bytes());

0 commit comments

Comments
 (0)