Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shar
}

public void start() {
if (shardsIts.size() == 0) {
if (shardsIts.isEmpty()) {
finishPhase();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.util.PlainIterator;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -113,15 +112,6 @@ SearchShardTarget nextOrNull() {
return null;
}

/**
* Return the number of shards remaining in this {@link ShardsIterator}
*
* @return number of shard remaining
*/
int remaining() {
return targetNodesIterator.remaining();
}

/**
* Returns a non-null value if this request should use a specific search context instead of the latest one.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -172,15 +170,7 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
private static List<SearchShardsGroup> toGroups(List<SearchShardIterator> shardIts) {
List<SearchShardsGroup> groups = new ArrayList<>(shardIts.size());
for (SearchShardIterator shardIt : shardIts) {
boolean skip = shardIt.skip();
shardIt.reset();
List<String> targetNodes = new ArrayList<>();
SearchShardTarget target;
while ((target = shardIt.nextOrNull()) != null) {
targetNodes.add(target.getNodeId());
}
ShardId shardId = shardIt.shardId();
groups.add(new SearchShardsGroup(shardId, targetNodes, skip));
groups.add(new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip()));
}
return groups;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public void testShardNotAvailableWithDisallowPartialFailures() {
// skip one to avoid the "all shards failed" failure.
SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null);
skipIterator.skip(true);
skipIterator.reset();
action.skipShard(skipIterator);
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ public void testAllowPartialResults() throws InterruptedException {

SearchTransportService transportService = new SearchTransportService(null, null, null);
Map<String, Transport.Connection> lookup = new HashMap<>();
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
Map<ShardId, AtomicInteger> seenShard = new ConcurrentHashMap<>();
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", AliasFilter.EMPTY);
Expand Down Expand Up @@ -581,17 +581,18 @@ protected void executePhaseOnShard(
Transport.Connection connection,
SearchActionListener<TestSearchPhaseResult> listener
) {
seenShard.computeIfAbsent(shardIt.shardId(), (i) -> {
AtomicInteger retries = seenShard.computeIfAbsent(shardIt.shardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per shard copy
return Boolean.TRUE;
return new AtomicInteger(0);
});
int numRetries = retries.incrementAndGet();
new Thread(() -> {
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()),
connection.getNode()
);
try {
if (shardIt.remaining() > 0) {
if (numRetries < shardIt.size()) {
numFailReplicas.incrementAndGet();
listener.onFailure(new RuntimeException());
} else {
Expand Down Expand Up @@ -643,10 +644,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException {
);
// Skip all the shards
searchShardIterator.skip(true);
searchShardIterator.reset();
searchShardIterators.add(searchShardIterator);
}
List<SearchShardIterator> shardsIter = searchShardIterators;
Map<String, Transport.Connection> lookup = Map.of(primaryNode.getId(), new MockConnection(primaryNode));

CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -665,11 +664,11 @@ public void testSkipUnavailableSearchShards() throws InterruptedException {
null,
request,
responseListener,
shardsIter,
searchShardIterators,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
ClusterState.EMPTY_STATE,
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
new ArraySearchPhaseResults<>(searchShardIterators.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY
) {
Expand Down Expand Up @@ -702,7 +701,7 @@ protected void run() {
assertNotNull(searchResponse.get());
assertThat(searchResponse.get().getSkippedShards(), equalTo(numUnavailableSkippedShards));
assertThat(searchResponse.get().getFailedShards(), equalTo(0));
assertThat(searchResponse.get().getSuccessfulShards(), equalTo(shardsIter.size()));
assertThat(searchResponse.get().getSuccessfulShards(), equalTo(searchShardIterators.size()));
}

static List<SearchShardIterator> getShardsIter(
Expand All @@ -728,7 +727,6 @@ static List<SearchShardIterator> getShardsIter(
for (int i = 0; i < numShards; i++) {
ArrayList<ShardRouting> started = new ArrayList<>();
ArrayList<ShardRouting> initializing = new ArrayList<>();
ArrayList<ShardRouting> unassigned = new ArrayList<>();

ShardRouting routing = ShardRouting.newUnassigned(
new ShardId(index, i),
Expand Down Expand Up @@ -758,8 +756,6 @@ static List<SearchShardIterator> getShardsIter(
} else {
initializing.add(routing);
}
} else {
unassigned.add(routing); // unused yet
}
}
Collections.shuffle(started, random());
Expand Down