Skip to content

Commit 697f6a1

Browse files
Remove GroupShardsIterator and replace it with plain List (#116891) (#122253)
There is no point in having `GroupShardsIterator`, it's mostly an unnecessary layer of indirection as it has no state and a single field only. It's only value could be seen in it hiding the ability to mutate the list it wraps, but that hardly justifies the overhead on the search path and extra code complexity. Moreover, the list it references is not copied/immutable in any way, so the value of hiding is limited also.
1 parent 07ce551 commit 697f6a1

File tree

40 files changed

+165
-449
lines changed

40 files changed

+165
-449
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.cluster.ClusterState;
3333
import org.elasticsearch.cluster.node.DiscoveryNode;
3434
import org.elasticsearch.cluster.node.DiscoveryNodes;
35-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3635
import org.elasticsearch.cluster.routing.ShardIterator;
3736
import org.elasticsearch.cluster.routing.ShardRouting;
3837
import org.elasticsearch.cluster.routing.ShardRoutingState;
@@ -531,8 +530,7 @@ public void testResolvePath() throws Exception {
531530
nodeNameToNodeId.put(cursor.getValue().getName(), cursor.getKey());
532531
}
533532

534-
final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
535-
.activePrimaryShardsGrouped(new String[] { indexName }, false);
533+
final List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { indexName }, false);
536534
final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
537535
final ShardRouting shardRouting = iterators.iterator().next().nextOrNull();
538536
assertThat(shardRouting, notNullValue());
@@ -571,8 +569,7 @@ public void testResolvePath() throws Exception {
571569

572570
private Path getPathToShardData(String indexName, String dirSuffix) {
573571
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
574-
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
575-
.activePrimaryShardsGrouped(new String[] { indexName }, false);
572+
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { indexName }, false);
576573
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
577574
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
578575
ShardRouting shardRouting = shardIterator.nextOrNull();

server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.cluster.health.ClusterHealthStatus;
3535
import org.elasticsearch.cluster.metadata.IndexMetadata;
3636
import org.elasticsearch.cluster.node.DiscoveryNode;
37-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3837
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3938
import org.elasticsearch.cluster.routing.ShardIterator;
4039
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -311,8 +310,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted
311310
}
312311
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
313312
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
314-
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
315-
.activePrimaryShardsGrouped(new String[] { "test" }, false);
313+
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { "test" }, false);
316314
for (ShardIterator iterator : shardIterators) {
317315
ShardRouting routing;
318316
while ((routing = iterator.nextOrNull()) != null) {
@@ -667,7 +665,7 @@ public void testReplicaCorruption() throws Exception {
667665

668666
private int numShards(String... index) {
669667
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
670-
GroupShardsIterator<?> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
668+
List<?> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
671669
return shardIterators.size();
672670
}
673671

@@ -695,8 +693,7 @@ private ShardRouting corruptRandomPrimaryFile() throws IOException {
695693
private ShardRouting corruptRandomPrimaryFile(final boolean includePerCommitFiles) throws IOException {
696694
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
697695
Index test = state.metadata().index("test").getIndex();
698-
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
699-
.activePrimaryShardsGrouped(new String[] { "test" }, false);
696+
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { "test" }, false);
700697
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
701698
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
702699
ShardRouting shardRouting = shardIterator.nextOrNull();

server/src/internalClusterTest/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1515
import org.elasticsearch.action.search.SearchRequestBuilder;
1616
import org.elasticsearch.cluster.ClusterState;
17-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
1817
import org.elasticsearch.cluster.routing.ShardIterator;
1918
import org.elasticsearch.cluster.routing.ShardRouting;
2019
import org.elasticsearch.index.search.stats.SearchStats;
@@ -24,6 +23,7 @@
2423
import org.elasticsearch.test.ESIntegTestCase;
2524

2625
import java.util.HashSet;
26+
import java.util.List;
2727
import java.util.Set;
2828

2929
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -146,7 +146,7 @@ private SearchRequestBuilder addSuggestions(SearchRequestBuilder request, int i)
146146

147147
private Set<String> nodeIdsWithIndex(String... indices) {
148148
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
149-
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
149+
List<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
150150
Set<String> nodes = new HashSet<>();
151151
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
152152
for (ShardRouting routing : shardIterator) {
@@ -161,7 +161,7 @@ private Set<String> nodeIdsWithIndex(String... indices) {
161161

162162
protected int numAssignedShards(String... indices) {
163163
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
164-
GroupShardsIterator<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
164+
List<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
165165
return allAssignedShardsGrouped.size();
166166
}
167167
}

server/src/internalClusterTest/java/org/elasticsearch/search/stats/SearchStatsIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
17-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
1817
import org.elasticsearch.cluster.routing.ShardIterator;
1918
import org.elasticsearch.cluster.routing.ShardRouting;
2019
import org.elasticsearch.common.settings.Settings;
@@ -165,7 +164,7 @@ public void testSimpleStats() throws Exception {
165164

166165
private Set<String> nodeIdsWithIndex(String... indices) {
167166
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
168-
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
167+
List<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
169168
Set<String> nodes = new HashSet<>();
170169
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
171170
for (ShardRouting routing : shardIterator) {
@@ -248,7 +247,7 @@ public void testOpenContexts() {
248247

249248
protected int numAssignedShards(String... indices) {
250249
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
251-
GroupShardsIterator<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
250+
List<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
252251
return allAssignedShardsGrouped.size();
253252
}
254253

server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2020
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
22-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2322
import org.elasticsearch.cluster.routing.ShardIterator;
2423
import org.elasticsearch.cluster.routing.ShardRouting;
2524
import org.elasticsearch.cluster.service.ClusterService;
@@ -34,6 +33,7 @@
3433

3534
import java.util.HashMap;
3635
import java.util.HashSet;
36+
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Set;
3939

@@ -101,7 +101,7 @@ protected void masterOperation(
101101
}
102102

103103
Set<String> nodeIds = new HashSet<>();
104-
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
104+
List<ShardIterator> groupShardsIterator = clusterService.operationRouting()
105105
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
106106
ShardRouting shard;
107107
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];

server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2222
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
24-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2524
import org.elasticsearch.cluster.routing.ShardIterator;
2625
import org.elasticsearch.cluster.routing.ShardRouting;
2726
import org.elasticsearch.cluster.service.ClusterService;
@@ -214,13 +213,8 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
214213
}
215214

216215
@Override
217-
protected GroupShardsIterator<ShardIterator> shards(
218-
ClusterState clusterState,
219-
AnalyzeIndexDiskUsageRequest request,
220-
String[] concreteIndices
221-
) {
222-
final GroupShardsIterator<ShardIterator> groups = clusterService.operationRouting()
223-
.searchShards(clusterState, concreteIndices, null, null);
216+
protected List<ShardIterator> shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) {
217+
final List<ShardIterator> groups = clusterService.operationRouting().searchShards(clusterState, concreteIndices, null, null);
224218
for (ShardIterator group : groups) {
225219
// fails fast if any non-active groups
226220
if (group.size() == 0) {

server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2323
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2424
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
25-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2625
import org.elasticsearch.cluster.routing.ShardIterator;
2726
import org.elasticsearch.cluster.routing.ShardRouting;
2827
import org.elasticsearch.cluster.service.ClusterService;
@@ -145,7 +144,7 @@ protected ShardValidateQueryResponse readShardResponse(StreamInput in) throws IO
145144
}
146145

147146
@Override
148-
protected GroupShardsIterator<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
147+
protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
149148
final String routing;
150149
if (request.allShards()) {
151150
routing = null;

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.action.OriginalIndices;
1919
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.node.DiscoveryNode;
21-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2221
import org.elasticsearch.cluster.routing.ShardIterator;
2322
import org.elasticsearch.cluster.routing.ShardRouting;
2423
import org.elasticsearch.cluster.service.ClusterService;
@@ -93,7 +92,7 @@ final class RequestDispatcher {
9392
this.onComplete = new RunOnce(onComplete);
9493
this.indexSelectors = ConcurrentCollections.newConcurrentMap();
9594
for (String index : indices) {
96-
final GroupShardsIterator<ShardIterator> shardIts;
95+
final List<ShardIterator> shardIts;
9796
try {
9897
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null);
9998
} catch (Exception e) {
@@ -250,7 +249,7 @@ private static class IndexSelector {
250249
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
251250
private final Map<ShardId, Exception> failures = new HashMap<>();
252251

253-
IndexSelector(GroupShardsIterator<ShardIterator> shardIts) {
252+
IndexSelector(List<ShardIterator> shardIts) {
254253
for (ShardIterator shardIt : shardIts) {
255254
for (ShardRouting shard : shardIt) {
256255
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.action.support.SubscribableListener;
2424
import org.elasticsearch.action.support.TransportActions;
2525
import org.elasticsearch.cluster.ClusterState;
26-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2726
import org.elasticsearch.common.bytes.BytesReference;
2827
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2928
import org.elasticsearch.common.util.Maps;
@@ -61,9 +60,9 @@
6160
import static org.elasticsearch.core.Strings.format;
6261

6362
/**
64-
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator}
63+
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link List<SearchShardIterator>}
6564
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
66-
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
65+
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link List<SearchShardIterator>} which is later
6766
* referred to as the {@code shardIndex}.
6867
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
6968
* distributed frequencies
@@ -94,8 +93,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9493
private final SearchTimeProvider timeProvider;
9594
private final SearchResponse.Clusters clusters;
9695

97-
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
98-
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
96+
protected final List<SearchShardIterator> toSkipShardsIts;
97+
protected final List<SearchShardIterator> shardsIts;
9998
private final SearchShardIterator[] shardIterators;
10099
private final AtomicInteger outstandingShards;
101100
private final int maxConcurrentRequestsPerNode;
@@ -117,7 +116,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
117116
Executor executor,
118117
SearchRequest request,
119118
ActionListener<SearchResponse> listener,
120-
GroupShardsIterator<SearchShardIterator> shardsIts,
119+
List<SearchShardIterator> shardsIts,
121120
SearchTimeProvider timeProvider,
122121
ClusterState clusterState,
123122
SearchTask task,
@@ -136,8 +135,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
136135
iterators.add(iterator);
137136
}
138137
}
139-
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
140-
this.shardsIts = new GroupShardsIterator<>(iterators);
138+
this.toSkipShardsIts = toSkipIterators;
139+
this.shardsIts = iterators;
141140
outstandingShards = new AtomicInteger(shardsIts.size());
142141
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
143142
// we later compute the shard index based on the natural order of the shards
@@ -172,8 +171,8 @@ protected void notifyListShards(
172171
SearchSourceBuilder sourceBuilder
173172
) {
174173
progressListener.notifyListShards(
175-
SearchProgressListener.buildSearchShards(this.shardsIts),
176-
SearchProgressListener.buildSearchShards(toSkipShardsIts),
174+
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
175+
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
177176
clusters,
178177
sourceBuilder == null || sourceBuilder.size() > 0,
179178
timeProvider
@@ -257,7 +256,7 @@ void skipShard(SearchShardIterator iterator) {
257256
successfulShardExecution();
258257
}
259258

260-
private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> shardsIts) {
259+
private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
261260
for (SearchShardIterator it : shardsIts) {
262261
if (it.getTargetNodeIds().isEmpty() == false) {
263262
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {

0 commit comments

Comments
 (0)