Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -531,8 +530,7 @@ public void testResolvePath() throws Exception {
nodeNameToNodeId.put(cursor.getValue().getName(), cursor.getKey());
}

final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
final List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { indexName }, false);
final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
final ShardRouting shardRouting = iterators.iterator().next().nextOrNull();
assertThat(shardRouting, notNullValue());
Expand Down Expand Up @@ -571,8 +569,7 @@ public void testResolvePath() throws Exception {

private Path getPathToShardData(String indexName, String dirSuffix) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { indexName }, false);
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -311,8 +310,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted
}
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { "test" }, false);
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { "test" }, false);
for (ShardIterator iterator : shardIterators) {
ShardRouting routing;
while ((routing = iterator.nextOrNull()) != null) {
Expand Down Expand Up @@ -667,7 +665,7 @@ public void testReplicaCorruption() throws Exception {

private int numShards(String... index) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<?> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
List<?> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
return shardIterators.size();
}

Expand Down Expand Up @@ -695,8 +693,7 @@ private ShardRouting corruptRandomPrimaryFile() throws IOException {
private ShardRouting corruptRandomPrimaryFile(final boolean includePerCommitFiles) throws IOException {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Index test = state.metadata().index("test").getIndex();
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { "test" }, false);
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { "test" }, false);
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.search.stats.SearchStats;
Expand All @@ -24,6 +23,7 @@
import org.elasticsearch.test.ESIntegTestCase;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -146,7 +146,7 @@ private SearchRequestBuilder addSuggestions(SearchRequestBuilder request, int i)

private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator) {
Expand All @@ -161,7 +161,7 @@ private Set<String> nodeIdsWithIndex(String... indices) {

protected int numAssignedShards(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
return allAssignedShardsGrouped.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -165,7 +164,7 @@ public void testSimpleStats() throws Exception {

private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator) {
Expand Down Expand Up @@ -248,7 +247,7 @@ public void testOpenContexts() {

protected int numAssignedShards(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
return allAssignedShardsGrouped.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -34,6 +33,7 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -101,7 +101,7 @@ protected void masterOperation(
}

Set<String> nodeIds = new HashSet<>();
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
List<ShardIterator> groupShardsIterator = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -214,13 +213,8 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
}

@Override
protected GroupShardsIterator<ShardIterator> shards(
ClusterState clusterState,
AnalyzeIndexDiskUsageRequest request,
String[] concreteIndices
) {
final GroupShardsIterator<ShardIterator> groups = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, null, null);
protected List<ShardIterator> shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) {
final List<ShardIterator> groups = clusterService.operationRouting().searchShards(clusterState, concreteIndices, null, null);
for (ShardIterator group : groups) {
// fails fast if any non-active groups
if (group.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -145,7 +144,7 @@ protected ShardValidateQueryResponse readShardResponse(StreamInput in) throws IO
}

@Override
protected GroupShardsIterator<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
final String routing;
if (request.allShards()) {
routing = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -93,7 +92,7 @@ final class RequestDispatcher {
this.onComplete = new RunOnce(onComplete);
this.indexSelectors = ConcurrentCollections.newConcurrentMap();
for (String index : indices) {
final GroupShardsIterator<ShardIterator> shardIts;
final List<ShardIterator> shardIts;
try {
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null);
} catch (Exception e) {
Expand Down Expand Up @@ -250,7 +249,7 @@ private static class IndexSelector {
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
private final Map<ShardId, Exception> failures = new HashMap<>();

IndexSelector(GroupShardsIterator<ShardIterator> shardIts) {
IndexSelector(List<ShardIterator> shardIts) {
for (ShardIterator shardIt : shardIts) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.Maps;
Expand Down Expand Up @@ -60,9 +59,9 @@
import static org.elasticsearch.core.Strings.format;

/**
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator}
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link List<SearchShardIterator>}
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link List<SearchShardIterator>} which is later
* referred to as the {@code shardIndex}.
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
* distributed frequencies
Expand Down Expand Up @@ -93,8 +92,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
protected final List<SearchShardIterator> toSkipShardsIts;
protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
Expand All @@ -116,7 +115,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Executor executor,
SearchRequest request,
ActionListener<SearchResponse> listener,
GroupShardsIterator<SearchShardIterator> shardsIts,
List<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider,
ClusterState clusterState,
SearchTask task,
Expand All @@ -135,8 +134,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.toSkipShardsIts = toSkipIterators;
this.shardsIts = iterators;
outstandingShards = new AtomicInteger(shardsIts.size());
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
// we later compute the shard index based on the natural order of the shards
Expand Down Expand Up @@ -171,8 +170,8 @@ protected void notifyListShards(
SearchSourceBuilder sourceBuilder
) {
progressListener.notifyListShards(
SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts),
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
clusters,
sourceBuilder == null || sourceBuilder.size() > 0,
timeProvider
Expand Down
Loading