Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand Down Expand Up @@ -88,18 +87,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger successfulOps;
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final List<SearchShardIterator> toSkipShardsIts;
protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;

// protected for tests
protected final List<Releasable> releasables = new ArrayList<>();
Expand All @@ -125,18 +124,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
) {
super(name);
this.namedWriteableRegistry = namedWriteableRegistry;
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
final List<SearchShardIterator> iterators = new ArrayList<>();
int skipped = 0;
for (final SearchShardIterator iterator : shardsIts) {
if (iterator.skip()) {
toSkipIterators.add(iterator);
skipped++;
} else {
iterators.add(iterator);
}
}
this.toSkipShardsIts = toSkipIterators;
this.skippedCount = skipped;
this.shardsIts = iterators;
outstandingShards = new AtomicInteger(shardsIts.size());
outstandingShards = new AtomicInteger(iterators.size());
successfulOps = new AtomicInteger(skipped);
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
// we later compute the shard index based on the natural order of the shards
// that participate in the search request. This means that this number is
Expand Down Expand Up @@ -167,11 +167,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected void notifyListShards(
SearchProgressListener progressListener,
SearchResponse.Clusters clusters,
SearchSourceBuilder sourceBuilder
SearchRequest searchRequest,
List<SearchShardIterator> allIterators
) {
final List<SearchShard> skipped = new ArrayList<>(allIterators.size() - shardsIts.size());
for (SearchShardIterator iter : allIterators) {
if (iter.skip()) {
skipped.add(new SearchShard(iter.getClusterAlias(), iter.shardId()));
}
}
var sourceBuilder = searchRequest.source();
progressListener.notifyListShards(
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
skipped,
clusters,
sourceBuilder == null || sourceBuilder.size() > 0,
timeProvider
Expand Down Expand Up @@ -215,37 +223,29 @@ public final void start() {

@Override
protected final void run() {
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
if (outstandingShards.get() == 0) {
onPhaseDone();
return;
}
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
for (int i = 0; i < shardIterators.length; i++) {
shardIndexMap.put(shardIterators[i], i);
}
if (shardsIts.size() > 0) {
doCheckNoMissingShards(getName(), request, shardsIts);
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
assert shardIndexMap.containsKey(shardRoutings);
int shardIndex = shardIndexMap.get(shardRoutings);
final SearchShardTarget routing = shardRoutings.nextOrNull();
if (routing == null) {
failOnUnavailable(shardIndex, shardRoutings);
} else {
performPhaseOnShard(shardIndex, shardRoutings, routing);
}
doCheckNoMissingShards(getName(), request, shardsIts);
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
assert shardIndexMap.containsKey(shardRoutings);
int shardIndex = shardIndexMap.get(shardRoutings);
final SearchShardTarget routing = shardRoutings.nextOrNull();
if (routing == null) {
failOnUnavailable(shardIndex, shardRoutings);
} else {
performPhaseOnShard(shardIndex, shardRoutings, routing);
}
}
}

void skipShard(SearchShardIterator iterator) {
successfulOps.incrementAndGet();
assert iterator.skip();
successfulShardExecution();
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
if (throttleConcurrentRequests) {
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
Expand Down Expand Up @@ -343,7 +343,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
discrepancy,
successfulOps.get(),
toSkipShardsIts.size(),
skippedCount,
getNumShards(),
currentPhase
);
Expand Down Expand Up @@ -585,7 +585,7 @@ private SearchResponse buildSearchResponse(
scrollId,
getNumShards(),
numSuccess,
toSkipShardsIts.size(),
skippedCount,
buildTookInMillis(),
failures,
clusters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
this.progressListener = task.getProgressListener();
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
if (progressListener != SearchProgressListener.NOOP) {
notifyListShards(progressListener, clusters, request.source());
notifyListShards(progressListener, clusters, request, shardsIts);
}
this.client = client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh

// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
if (progressListener != SearchProgressListener.NOOP) {
notifyListShards(progressListener, clusters, request.source());
notifyListShards(progressListener, clusters, request, shardsIts);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,17 @@ public void testShardNotAvailableWithDisallowPartialFailures() {
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = new ArraySearchPhaseResults<>(numShards);
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
// skip one to avoid the "all shards failed" failure.
SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null);
skipIterator.skip(true);
action.skipShard(skipIterator);
action.onShardResult(new SearchPhaseResult() {
@Override
public int getShardIndex() {
return 0;
}

@Override
public SearchShardTarget getSearchShardTarget() {
return new SearchShardTarget(null, null, null);
}
});
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get();
assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage());
Expand Down