Skip to content

Commit 3590be7

Browse files
authored
Split request processing for routing to pre- and post-processing. (#118420)
1 parent 2d66d25 commit 3590be7

File tree

9 files changed

+50
-58
lines changed

9 files changed

+50
-58
lines changed

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,14 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
160160
boolean isRequireDataStream();
161161

162162
/**
163-
* Finalize the request before executing or routing it.
163+
* Finalize the request before routing it.
164164
*/
165-
void process(IndexRouting indexRouting);
165+
default void preRoutingProcess(IndexRouting indexRouting) {}
166+
167+
/**
168+
* Finalize the request after routing it.
169+
*/
170+
default void postRoutingProcess(IndexRouting indexRouting) {}
166171

167172
/**
168173
* Pick the appropriate shard id to receive this request.

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,9 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
314314
continue;
315315
}
316316
IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
317-
docWriteRequest.process(indexRouting);
317+
docWriteRequest.preRoutingProcess(indexRouting);
318318
int shardId = docWriteRequest.route(indexRouting);
319+
docWriteRequest.postRoutingProcess(indexRouting);
319320
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
320321
new ShardId(concreteIndex, shardId),
321322
shard -> new ArrayList<>()

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,6 @@ public boolean isRequireDataStream() {
237237
return false;
238238
}
239239

240-
@Override
241-
public void process(IndexRouting indexRouting) {
242-
// Nothing to do
243-
}
244-
245240
@Override
246241
public int route(IndexRouting indexRouting) {
247242
return indexRouting.deleteShard(id, routing);

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -685,8 +685,13 @@ public VersionType versionType() {
685685
}
686686

687687
@Override
688-
public void process(IndexRouting indexRouting) {
689-
indexRouting.process(this);
688+
public void preRoutingProcess(IndexRouting indexRouting) {
689+
indexRouting.preProcess(this);
690+
}
691+
692+
@Override
693+
public void postRoutingProcess(IndexRouting indexRouting) {
694+
indexRouting.postProcess(this);
690695
}
691696

692697
/**
@@ -885,7 +890,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
885890

886891
@Override
887892
public int route(IndexRouting indexRouting) {
888-
return indexRouting.indexShard(id, routing, contentType, source, this::routing);
893+
return indexRouting.indexShard(id, routing, contentType, source);
889894
}
890895

891896
public IndexRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -683,11 +683,6 @@ public boolean isRequireDataStream() {
683683
return false;
684684
}
685685

686-
@Override
687-
public void process(IndexRouting indexRouting) {
688-
// Nothing to do
689-
}
690-
691686
@Override
692687
public int route(IndexRouting indexRouting) {
693688
return indexRouting.updateShard(id, routing);

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.List;
4242
import java.util.Map;
4343
import java.util.Set;
44-
import java.util.function.Consumer;
4544
import java.util.function.IntConsumer;
4645
import java.util.function.IntSupplier;
4746
import java.util.function.Predicate;
@@ -80,19 +79,21 @@ private IndexRouting(IndexMetadata metadata) {
8079
this.routingFactor = metadata.getRoutingFactor();
8180
}
8281

83-
public abstract void process(IndexRequest indexRequest);
82+
/**
83+
* Finalize the request before routing, with data needed for routing decisions.
84+
*/
85+
public void preProcess(IndexRequest indexRequest) {}
86+
87+
/**
88+
* Finalize the request after routing, incorporating data produced by the routing logic.
89+
*/
90+
public void postProcess(IndexRequest indexRequest) {}
8491

8592
/**
8693
* Called when indexing a document to generate the shard id that should contain
8794
* a document with the provided parameters.
8895
*/
89-
public abstract int indexShard(
90-
String id,
91-
@Nullable String routing,
92-
XContentType sourceType,
93-
BytesReference source,
94-
Consumer<String> routingHashSetter
95-
);
96+
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source);
9697

9798
/**
9899
* Called when updating a document to generate the shard id that should contain
@@ -163,7 +164,7 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {
163164
protected abstract int shardId(String id, @Nullable String routing);
164165

165166
@Override
166-
public void process(IndexRequest indexRequest) {
167+
public void preProcess(IndexRequest indexRequest) {
167168
// generate id if not already provided
168169
final String id = indexRequest.id();
169170
if (id == null) {
@@ -187,13 +188,7 @@ private static boolean isNewIndexVersion(final IndexVersion creationVersion) {
187188
}
188189

189190
@Override
190-
public int indexShard(
191-
String id,
192-
@Nullable String routing,
193-
XContentType sourceType,
194-
BytesReference source,
195-
Consumer<String> routingHashSetter
196-
) {
191+
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
197192
if (id == null) {
198193
throw new IllegalStateException("id is required and should have been set by process");
199194
}
@@ -278,6 +273,7 @@ public static class ExtractFromSource extends IndexRouting {
278273
private final Predicate<String> isRoutingPath;
279274
private final XContentParserConfiguration parserConfig;
280275
private final boolean trackTimeSeriesRoutingHash;
276+
private int hash = Integer.MAX_VALUE;
281277

282278
ExtractFromSource(IndexMetadata metadata) {
283279
super(metadata);
@@ -295,22 +291,17 @@ public boolean matchesField(String fieldName) {
295291
}
296292

297293
@Override
298-
public void process(IndexRequest indexRequest) {}
294+
public void postProcess(IndexRequest indexRequest) {
295+
if (trackTimeSeriesRoutingHash) {
296+
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
297+
}
298+
}
299299

300300
@Override
301-
public int indexShard(
302-
String id,
303-
@Nullable String routing,
304-
XContentType sourceType,
305-
BytesReference source,
306-
Consumer<String> routingHashSetter
307-
) {
301+
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
308302
assert Transports.assertNotTransportThread("parsing the _source can get slow");
309303
checkNoRouting(routing);
310-
int hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
311-
if (trackTimeSeriesRoutingHash) {
312-
routingHashSetter.accept(TimeSeriesRoutingHashFieldMapper.encode(hash));
313-
}
304+
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
314305
return hashToShardId(hash);
315306
}
316307

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void testSimpleRoutingRejectsEmptyId() {
4848
IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build()
4949
);
5050
IndexRequest req = new IndexRequest().id("");
51-
Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.process(req));
51+
Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.preProcess(req));
5252
assertThat(e.getMessage(), equalTo("if _id is specified it must not be empty"));
5353
}
5454

@@ -58,7 +58,7 @@ public void testSimpleRoutingAcceptsId() {
5858
);
5959
String id = randomAlphaOfLength(10);
6060
IndexRequest req = new IndexRequest().id(id);
61-
indexRouting.process(req);
61+
indexRouting.preProcess(req);
6262
assertThat(req.id(), equalTo(id));
6363
assertThat(req.getAutoGeneratedTimestamp(), equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP));
6464
}
@@ -68,7 +68,7 @@ public void testSimpleRoutingAssignedRandomId() {
6868
IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build()
6969
);
7070
IndexRequest req = new IndexRequest();
71-
indexRouting.process(req);
71+
indexRouting.preProcess(req);
7272
assertThat(req.id(), not(nullValue()));
7373
assertThat(req.getAutoGeneratedTimestamp(), not(equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP)));
7474
}
@@ -458,7 +458,7 @@ public void testRequiredRouting() {
458458
*/
459459
private int shardIdFromSimple(IndexRouting indexRouting, String id, @Nullable String routing) {
460460
return switch (between(0, 3)) {
461-
case 0 -> indexRouting.indexShard(id, routing, null, null, null);
461+
case 0 -> indexRouting.indexShard(id, routing, null, null);
462462
case 1 -> indexRouting.updateShard(id, routing);
463463
case 2 -> indexRouting.deleteShard(id, routing);
464464
case 3 -> indexRouting.getShard(id, routing);
@@ -470,7 +470,7 @@ public void testRoutingAllowsId() {
470470
IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
471471
String id = randomAlphaOfLength(5);
472472
IndexRequest req = new IndexRequest().id(id);
473-
indexRouting.process(req);
473+
indexRouting.preProcess(req);
474474
assertThat(req.id(), equalTo(id));
475475
}
476476

@@ -483,15 +483,15 @@ public void testRoutingAllowsId() {
483483
public void testRoutingPathLeavesIdNull() {
484484
IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
485485
IndexRequest req = new IndexRequest();
486-
indexRouting.process(req);
486+
indexRouting.preProcess(req);
487487
assertThat(req.id(), nullValue());
488488
}
489489

490490
public void testRoutingPathEmptySource() throws IOException {
491491
IndexRouting routing = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
492492
Exception e = expectThrows(
493493
IllegalArgumentException.class,
494-
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()), null)
494+
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()))
495495
);
496496
assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields"));
497497
}
@@ -500,7 +500,7 @@ public void testRoutingPathMismatchSource() throws IOException {
500500
IndexRouting routing = indexRoutingForPath(between(1, 5), "foo");
501501
Exception e = expectThrows(
502502
IllegalArgumentException.class,
503-
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")), null)
503+
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")))
504504
);
505505
assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields"));
506506
}
@@ -521,7 +521,7 @@ public void testRoutingIndexWithRouting() throws IOException {
521521
String docRouting = randomAlphaOfLength(5);
522522
Exception e = expectThrows(
523523
IllegalArgumentException.class,
524-
() -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source, null)
524+
() -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source)
525525
);
526526
assertThat(
527527
e.getMessage(),
@@ -615,7 +615,7 @@ public void testRoutingPathObjectArraysInSource() throws IOException {
615615
BytesReference source = source(Map.of("a", List.of("foo", Map.of("foo", "bar"))));
616616
Exception e = expectThrows(
617617
IllegalArgumentException.class,
618-
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source, s -> {})
618+
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source)
619619
);
620620
assertThat(
621621
e.getMessage(),
@@ -683,7 +683,7 @@ private IndexRouting indexRoutingForPath(IndexVersion createdVersion, int shards
683683
private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int expectedShard) throws IOException {
684684
byte[] suffix = randomSuffix();
685685
BytesReference sourceBytes = source(source);
686-
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes, s -> {}), equalTo(expectedShard));
686+
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes), equalTo(expectedShard));
687687
IndexRouting.ExtractFromSource r = (IndexRouting.ExtractFromSource) routing;
688688
String idFromSource = r.createId(XContentType.JSON, sourceBytes, suffix);
689689
assertThat(shardIdForReadFromSourceExtracting(routing, idFromSource), equalTo(expectedShard));

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ private void executeShardBulkOnPrimary(
870870
) {
871871
for (BulkItemRequest itemRequest : request.items()) {
872872
if (itemRequest.request() instanceof IndexRequest) {
873-
((IndexRequest) itemRequest.request()).process(primary.indexSettings().getIndexRouting());
873+
itemRequest.request().preRoutingProcess(primary.indexSettings().getIndexRouting());
874874
}
875875
}
876876
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2350,7 +2350,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) {
23502350
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(clusterState.metadata().getIndexSafe(index));
23512351
while (true) {
23522352
String routing = RandomStrings.randomAsciiLettersOfLength(random, 10);
2353-
if (shard == indexRouting.indexShard("id", routing, null, null, null)) {
2353+
if (shard == indexRouting.indexShard("id", routing, null, null)) {
23542354
return routing;
23552355
}
23562356
}

0 commit comments

Comments
 (0)