Skip to content

Commit ee12925

Browse files
Some cleanups to OperationRouting (#119440)
Two small cleanups that remove duplication and dead-code: 1. The shardId method can be made `static`, it's only non-static use in mocking was unnecessary and could just be deleted 2. Filtering for searchable shards is the same logic now for stateful and stateless. We also only use it when building a filtered `PlainShardIterator` so it might as well live there to remove some duplication.
1 parent ee63638 commit ee12925

File tree

11 files changed

+48
-52
lines changed

11 files changed

+48
-52
lines changed

server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1717
import org.elasticsearch.cluster.ClusterState;
1818
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
19+
import org.elasticsearch.cluster.routing.OperationRouting;
1920
import org.elasticsearch.cluster.routing.ShardRouting;
20-
import org.elasticsearch.cluster.service.ClusterService;
2121
import org.elasticsearch.common.Priority;
2222
import org.elasticsearch.common.settings.Setting;
2323
import org.elasticsearch.common.util.CollectionUtils;
@@ -365,11 +365,10 @@ private void iterateAssertCount(final int numberOfShards, final int iterations,
365365
);
366366
}
367367

368-
ClusterService clusterService = clusterService();
369-
final ClusterState state = clusterService.state();
368+
final ClusterState state = clusterService().state();
370369
for (int shard = 0; shard < numberOfShards; shard++) {
371370
for (String id : ids) {
372-
ShardId docShard = clusterService.operationRouting().shardId(state, "test", id, null);
371+
ShardId docShard = OperationRouting.shardId(state, "test", id, null);
373372
if (docShard.id() == shard) {
374373
final IndexShardRoutingTable indexShardRoutingTable = state.routingTable().shardRoutingTable("test", shard);
375374
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ final class RequestDispatcher {
9595
for (String index : indices) {
9696
final GroupShardsIterator<ShardIterator> shardIts;
9797
try {
98-
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null, null, null);
98+
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null);
9999
} catch (Exception e) {
100100
onIndexFailure.accept(index, e);
101101
continue;

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.cluster.routing.PlainShardIterator;
3232
import org.elasticsearch.cluster.routing.ShardIterator;
33-
import org.elasticsearch.cluster.routing.ShardRouting;
3433
import org.elasticsearch.cluster.service.ClusterService;
3534
import org.elasticsearch.common.io.stream.Writeable;
3635
import org.elasticsearch.core.TimeValue;
@@ -109,7 +108,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
109108
if (iterator == null) {
110109
return null;
111110
}
112-
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
111+
return PlainShardIterator.allSearchableShards(iterator);
113112
}
114113

115114
@Override

server/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2121
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.routing.OperationRouting;
2223
import org.elasticsearch.cluster.service.ClusterService;
2324
import org.elasticsearch.common.util.concurrent.AtomicArray;
2425
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -81,7 +82,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL
8182
lastResolvedIndex = Tuple.tuple(item.index(), concreteSingleIndex);
8283
}
8384
item.routing(clusterState.metadata().resolveIndexRouting(item.routing(), item.index()));
84-
shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex, item.id(), item.routing());
85+
shardId = OperationRouting.shardId(clusterState, concreteSingleIndex, item.id(), item.routing());
8586
} catch (RoutingMissingException e) {
8687
responses.set(i, newItemFailure(e.getIndex().getName(), e.getId(), e));
8788
continue;

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.cluster.routing.PlainShardIterator;
3232
import org.elasticsearch.cluster.routing.ShardIterator;
33-
import org.elasticsearch.cluster.routing.ShardRouting;
3433
import org.elasticsearch.cluster.service.ClusterService;
3534
import org.elasticsearch.common.io.stream.Writeable;
3635
import org.elasticsearch.core.TimeValue;
@@ -113,7 +112,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
113112
if (iterator == null) {
114113
return null;
115114
}
116-
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
115+
return PlainShardIterator.allSearchableShards(iterator);
117116
}
118117

119118
@Override

server/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.ClusterState;
1818
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1919
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
20+
import org.elasticsearch.cluster.routing.OperationRouting;
2021
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.util.concurrent.AtomicArray;
2223
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -72,8 +73,12 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
7273
clusterState.metadata().resolveIndexRouting(termVectorsRequest.routing(), termVectorsRequest.index())
7374
);
7475
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
75-
shardId = clusterService.operationRouting()
76-
.shardId(clusterState, concreteSingleIndex, termVectorsRequest.id(), termVectorsRequest.routing());
76+
shardId = OperationRouting.shardId(
77+
clusterState,
78+
concreteSingleIndex,
79+
termVectorsRequest.id(),
80+
termVectorsRequest.routing()
81+
);
7782
} catch (RoutingMissingException e) {
7883
responses.set(
7984
i,

server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,27 @@ public TransportTermVectorsAction(
6262

6363
@Override
6464
protected ShardIterator shards(ClusterState state, InternalRequest request) {
65+
final var operationRouting = clusterService.operationRouting();
6566
if (request.request().doc() != null && request.request().routing() == null) {
6667
// artificial document without routing specified, ignore its "id" and use either random shard or according to preference
67-
GroupShardsIterator<ShardIterator> groupShardsIter = clusterService.operationRouting()
68-
.searchShards(state, new String[] { request.concreteIndex() }, null, request.request().preference());
68+
GroupShardsIterator<ShardIterator> groupShardsIter = operationRouting.searchShards(
69+
state,
70+
new String[] { request.concreteIndex() },
71+
null,
72+
request.request().preference()
73+
);
6974
return groupShardsIter.iterator().next();
7075
}
7176

72-
ShardIterator shards = clusterService.operationRouting()
73-
.getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference());
74-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
77+
return operationRouting.useOnlyPromotableShardsForStateless(
78+
operationRouting.getShards(
79+
state,
80+
request.concreteIndex(),
81+
request.request().id(),
82+
request.request().routing(),
83+
request.request().preference()
84+
)
85+
);
7586
}
7687

7788
@Override

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.Arrays;
2828
import java.util.Collections;
2929
import java.util.HashSet;
30-
import java.util.List;
3130
import java.util.Map;
3231
import java.util.Set;
3332
import java.util.stream.Collectors;
@@ -125,32 +124,12 @@ public GroupShardsIterator<ShardIterator> searchShards(
125124
nodeCounts
126125
);
127126
if (iterator != null) {
128-
final List<ShardRouting> shardsThatCanHandleSearches;
129-
if (isStateless) {
130-
shardsThatCanHandleSearches = statelessShardsThatHandleSearches(clusterState, iterator);
131-
} else {
132-
shardsThatCanHandleSearches = statefulShardsThatHandleSearches(iterator);
133-
}
134-
set.add(new PlainShardIterator(iterator.shardId(), shardsThatCanHandleSearches));
127+
set.add(PlainShardIterator.allSearchableShards(iterator));
135128
}
136129
}
137130
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
138131
}
139132

140-
private static List<ShardRouting> statefulShardsThatHandleSearches(ShardIterator iterator) {
141-
final List<ShardRouting> shardsThatCanHandleSearches = new ArrayList<>(iterator.size());
142-
for (ShardRouting shardRouting : iterator) {
143-
if (shardRouting.isSearchable()) {
144-
shardsThatCanHandleSearches.add(shardRouting);
145-
}
146-
}
147-
return shardsThatCanHandleSearches;
148-
}
149-
150-
private static List<ShardRouting> statelessShardsThatHandleSearches(ClusterState clusterState, ShardIterator iterator) {
151-
return iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList();
152-
}
153-
154133
public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
155134
final IndexShardRoutingTable shard = clusterState.routingTable().shardRoutingTable(shardId);
156135
return shard.activeInitializingShardsRandomIt();
@@ -297,7 +276,7 @@ private static IndexMetadata indexMetadata(ClusterState clusterState, String ind
297276
return indexMetadata;
298277
}
299278

300-
public ShardId shardId(ClusterState clusterState, String index, String id, @Nullable String routing) {
279+
public static ShardId shardId(ClusterState clusterState, String index, String id, @Nullable String routing) {
301280
IndexMetadata indexMetadata = indexMetadata(clusterState, index);
302281
return new ShardId(indexMetadata.getIndex(), IndexRouting.fromIndexMetadata(indexMetadata).getShard(id, routing));
303282
}

server/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.index.shard.ShardId;
1313

14+
import java.util.ArrayList;
1415
import java.util.List;
1516

1617
/**
@@ -21,6 +22,20 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter
2122

2223
private final ShardId shardId;
2324

25+
public static PlainShardIterator allSearchableShards(ShardIterator shardIterator) {
26+
return new PlainShardIterator(shardIterator.shardId(), shardsThatCanHandleSearches(shardIterator));
27+
}
28+
29+
private static List<ShardRouting> shardsThatCanHandleSearches(ShardIterator iterator) {
30+
final List<ShardRouting> shardsThatCanHandleSearches = new ArrayList<>(iterator.size());
31+
for (ShardRouting shardRouting : iterator) {
32+
if (shardRouting.isSearchable()) {
33+
shardsThatCanHandleSearches.add(shardRouting);
34+
}
35+
}
36+
return shardsThatCanHandleSearches;
37+
}
38+
2439
/**
2540
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
2641
* this the a given <code>shardId</code>.

server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,9 @@ public static void beforeClass() throws Exception {
145145
when(
146146
operationRouting.getShards(eq(clusterState), eq(index1.getName()), anyString(), nullable(String.class), nullable(String.class))
147147
).thenReturn(index1ShardIterator);
148-
when(operationRouting.shardId(eq(clusterState), eq(index1.getName()), nullable(String.class), nullable(String.class))).thenReturn(
149-
new ShardId(index1, randomInt())
150-
);
151148
when(
152149
operationRouting.getShards(eq(clusterState), eq(index2.getName()), anyString(), nullable(String.class), nullable(String.class))
153150
).thenReturn(index2ShardIterator);
154-
when(operationRouting.shardId(eq(clusterState), eq(index2.getName()), nullable(String.class), nullable(String.class))).thenReturn(
155-
new ShardId(index2, randomInt())
156-
);
157151

158152
clusterService = mock(ClusterService.class);
159153
when(clusterService.localNode()).thenReturn(transportService.getLocalNode());

0 commit comments

Comments
 (0)