Skip to content

Commit aea8255

Browse files
authored
Split request processing for routing to pre- and post-processing. (elastic#118420) (elastic#118444)
1 parent ceb53f7 commit aea8255

File tree

9 files changed

+50
-58
lines changed

9 files changed

+50
-58
lines changed

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,14 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
160160
boolean isRequireDataStream();
161161

162162
/**
163-
* Finalize the request before executing or routing it.
163+
* Finalize the request before routing it.
164164
*/
165-
void process(IndexRouting indexRouting);
165+
default void preRoutingProcess(IndexRouting indexRouting) {}
166+
167+
/**
168+
* Finalize the request after routing it.
169+
*/
170+
default void postRoutingProcess(IndexRouting indexRouting) {}
166171

167172
/**
168173
* Pick the appropriate shard id to receive this request.

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,9 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
313313
continue;
314314
}
315315
IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
316-
docWriteRequest.process(indexRouting);
316+
docWriteRequest.preRoutingProcess(indexRouting);
317317
int shardId = docWriteRequest.route(indexRouting);
318+
docWriteRequest.postRoutingProcess(indexRouting);
318319
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
319320
new ShardId(concreteIndex, shardId),
320321
shard -> new ArrayList<>()

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,6 @@ public boolean isRequireDataStream() {
237237
return false;
238238
}
239239

240-
@Override
241-
public void process(IndexRouting indexRouting) {
242-
// Nothing to do
243-
}
244-
245240
@Override
246241
public int route(IndexRouting indexRouting) {
247242
return indexRouting.deleteShard(id, routing);

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -685,8 +685,13 @@ public VersionType versionType() {
685685
}
686686

687687
@Override
688-
public void process(IndexRouting indexRouting) {
689-
indexRouting.process(this);
688+
public void preRoutingProcess(IndexRouting indexRouting) {
689+
indexRouting.preProcess(this);
690+
}
691+
692+
@Override
693+
public void postRoutingProcess(IndexRouting indexRouting) {
694+
indexRouting.postProcess(this);
690695
}
691696

692697
/**
@@ -885,7 +890,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
885890

886891
@Override
887892
public int route(IndexRouting indexRouting) {
888-
return indexRouting.indexShard(id, routing, contentType, source, this::routing);
893+
return indexRouting.indexShard(id, routing, contentType, source);
889894
}
890895

891896
public IndexRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -683,11 +683,6 @@ public boolean isRequireDataStream() {
683683
return false;
684684
}
685685

686-
@Override
687-
public void process(IndexRouting indexRouting) {
688-
// Nothing to do
689-
}
690-
691686
@Override
692687
public int route(IndexRouting indexRouting) {
693688
return indexRouting.updateShard(id, routing);

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Set;
43-
import java.util.function.Consumer;
4443
import java.util.function.IntConsumer;
4544
import java.util.function.IntSupplier;
4645
import java.util.function.Predicate;
@@ -79,19 +78,21 @@ private IndexRouting(IndexMetadata metadata) {
7978
this.routingFactor = metadata.getRoutingFactor();
8079
}
8180

82-
public abstract void process(IndexRequest indexRequest);
81+
/**
82+
* Finalize the request before routing, with data needed for routing decisions.
83+
*/
84+
public void preProcess(IndexRequest indexRequest) {}
85+
86+
/**
87+
* Finalize the request after routing, incorporating data produced by the routing logic.
88+
*/
89+
public void postProcess(IndexRequest indexRequest) {}
8390

8491
/**
8592
* Called when indexing a document to generate the shard id that should contain
8693
* a document with the provided parameters.
8794
*/
88-
public abstract int indexShard(
89-
String id,
90-
@Nullable String routing,
91-
XContentType sourceType,
92-
BytesReference source,
93-
Consumer<String> routingHashSetter
94-
);
95+
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source);
9596

9697
/**
9798
* Called when updating a document to generate the shard id that should contain
@@ -162,7 +163,7 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {
162163
protected abstract int shardId(String id, @Nullable String routing);
163164

164165
@Override
165-
public void process(IndexRequest indexRequest) {
166+
public void preProcess(IndexRequest indexRequest) {
166167
// generate id if not already provided
167168
final String id = indexRequest.id();
168169
if (id == null) {
@@ -177,13 +178,7 @@ public void process(IndexRequest indexRequest) {
177178
}
178179

179180
@Override
180-
public int indexShard(
181-
String id,
182-
@Nullable String routing,
183-
XContentType sourceType,
184-
BytesReference source,
185-
Consumer<String> routingHashSetter
186-
) {
181+
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
187182
if (id == null) {
188183
throw new IllegalStateException("id is required and should have been set by process");
189184
}
@@ -268,6 +263,7 @@ public static class ExtractFromSource extends IndexRouting {
268263
private final Predicate<String> isRoutingPath;
269264
private final XContentParserConfiguration parserConfig;
270265
private final boolean trackTimeSeriesRoutingHash;
266+
private int hash = Integer.MAX_VALUE;
271267

272268
ExtractFromSource(IndexMetadata metadata) {
273269
super(metadata);
@@ -285,22 +281,17 @@ public boolean matchesField(String fieldName) {
285281
}
286282

287283
@Override
288-
public void process(IndexRequest indexRequest) {}
284+
public void postProcess(IndexRequest indexRequest) {
285+
if (trackTimeSeriesRoutingHash) {
286+
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
287+
}
288+
}
289289

290290
@Override
291-
public int indexShard(
292-
String id,
293-
@Nullable String routing,
294-
XContentType sourceType,
295-
BytesReference source,
296-
Consumer<String> routingHashSetter
297-
) {
291+
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
298292
assert Transports.assertNotTransportThread("parsing the _source can get slow");
299293
checkNoRouting(routing);
300-
int hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
301-
if (trackTimeSeriesRoutingHash) {
302-
routingHashSetter.accept(TimeSeriesRoutingHashFieldMapper.encode(hash));
303-
}
294+
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
304295
return hashToShardId(hash);
305296
}
306297

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void testSimpleRoutingRejectsEmptyId() {
4848
IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build()
4949
);
5050
IndexRequest req = new IndexRequest().id("");
51-
Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.process(req));
51+
Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.preProcess(req));
5252
assertThat(e.getMessage(), equalTo("if _id is specified it must not be empty"));
5353
}
5454

@@ -58,7 +58,7 @@ public void testSimpleRoutingAcceptsId() {
5858
);
5959
String id = randomAlphaOfLength(10);
6060
IndexRequest req = new IndexRequest().id(id);
61-
indexRouting.process(req);
61+
indexRouting.preProcess(req);
6262
assertThat(req.id(), equalTo(id));
6363
assertThat(req.getAutoGeneratedTimestamp(), equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP));
6464
}
@@ -68,7 +68,7 @@ public void testSimpleRoutingAssignedRandomId() {
6868
IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build()
6969
);
7070
IndexRequest req = new IndexRequest();
71-
indexRouting.process(req);
71+
indexRouting.preProcess(req);
7272
assertThat(req.id(), not(nullValue()));
7373
assertThat(req.getAutoGeneratedTimestamp(), not(equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP)));
7474
}
@@ -458,7 +458,7 @@ public void testRequiredRouting() {
458458
*/
459459
private int shardIdFromSimple(IndexRouting indexRouting, String id, @Nullable String routing) {
460460
return switch (between(0, 3)) {
461-
case 0 -> indexRouting.indexShard(id, routing, null, null, null);
461+
case 0 -> indexRouting.indexShard(id, routing, null, null);
462462
case 1 -> indexRouting.updateShard(id, routing);
463463
case 2 -> indexRouting.deleteShard(id, routing);
464464
case 3 -> indexRouting.getShard(id, routing);
@@ -470,7 +470,7 @@ public void testRoutingAllowsId() {
470470
IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
471471
String id = randomAlphaOfLength(5);
472472
IndexRequest req = new IndexRequest().id(id);
473-
indexRouting.process(req);
473+
indexRouting.preProcess(req);
474474
assertThat(req.id(), equalTo(id));
475475
}
476476

@@ -483,15 +483,15 @@ public void testRoutingAllowsId() {
483483
public void testRoutingPathLeavesIdNull() {
484484
IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
485485
IndexRequest req = new IndexRequest();
486-
indexRouting.process(req);
486+
indexRouting.preProcess(req);
487487
assertThat(req.id(), nullValue());
488488
}
489489

490490
public void testRoutingPathEmptySource() throws IOException {
491491
IndexRouting routing = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
492492
Exception e = expectThrows(
493493
IllegalArgumentException.class,
494-
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()), null)
494+
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()))
495495
);
496496
assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields"));
497497
}
@@ -500,7 +500,7 @@ public void testRoutingPathMismatchSource() throws IOException {
500500
IndexRouting routing = indexRoutingForPath(between(1, 5), "foo");
501501
Exception e = expectThrows(
502502
IllegalArgumentException.class,
503-
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")), null)
503+
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")))
504504
);
505505
assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields"));
506506
}
@@ -521,7 +521,7 @@ public void testRoutingIndexWithRouting() throws IOException {
521521
String docRouting = randomAlphaOfLength(5);
522522
Exception e = expectThrows(
523523
IllegalArgumentException.class,
524-
() -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source, null)
524+
() -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source)
525525
);
526526
assertThat(
527527
e.getMessage(),
@@ -615,7 +615,7 @@ public void testRoutingPathObjectArraysInSource() throws IOException {
615615
BytesReference source = source(Map.of("a", List.of("foo", Map.of("foo", "bar"))));
616616
Exception e = expectThrows(
617617
IllegalArgumentException.class,
618-
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source, s -> {})
618+
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source)
619619
);
620620
assertThat(
621621
e.getMessage(),
@@ -683,7 +683,7 @@ private IndexRouting indexRoutingForPath(IndexVersion createdVersion, int shards
683683
private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int expectedShard) throws IOException {
684684
byte[] suffix = randomSuffix();
685685
BytesReference sourceBytes = source(source);
686-
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes, s -> {}), equalTo(expectedShard));
686+
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes), equalTo(expectedShard));
687687
IndexRouting.ExtractFromSource r = (IndexRouting.ExtractFromSource) routing;
688688
String idFromSource = r.createId(XContentType.JSON, sourceBytes, suffix);
689689
assertThat(shardIdForReadFromSourceExtracting(routing, idFromSource), equalTo(expectedShard));

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ private void executeShardBulkOnPrimary(
870870
) {
871871
for (BulkItemRequest itemRequest : request.items()) {
872872
if (itemRequest.request() instanceof IndexRequest) {
873-
((IndexRequest) itemRequest.request()).process(primary.indexSettings().getIndexRouting());
873+
itemRequest.request().preRoutingProcess(primary.indexSettings().getIndexRouting());
874874
}
875875
}
876876
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2345,7 +2345,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) {
23452345
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(clusterState.metadata().getIndexSafe(index));
23462346
while (true) {
23472347
String routing = RandomStrings.randomAsciiLettersOfLength(random, 10);
2348-
if (shard == indexRouting.indexShard("id", routing, null, null, null)) {
2348+
if (shard == indexRouting.indexShard("id", routing, null, null)) {
23492349
return routing;
23502350
}
23512351
}

0 commit comments

Comments
 (0)