Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand Down Expand Up @@ -88,18 +87,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger successfulOps;
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final List<SearchShardIterator> toSkipShardsIts;
protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;

// protected for tests
protected final List<Releasable> releasables = new ArrayList<>();
Expand All @@ -125,18 +124,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
) {
super(name);
this.namedWriteableRegistry = namedWriteableRegistry;
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
final List<SearchShardIterator> iterators = new ArrayList<>();
int skipped = 0;
for (final SearchShardIterator iterator : shardsIts) {
if (iterator.skip()) {
toSkipIterators.add(iterator);
skipped++;
} else {
iterators.add(iterator);
}
}
this.toSkipShardsIts = toSkipIterators;
this.skippedCount = skipped;
this.shardsIts = iterators;
outstandingShards = new AtomicInteger(shardsIts.size());
outstandingShards = new AtomicInteger(shardsIts.size() - skipped);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it's slightly confusing which shardsIts it is - the parameter shardsIts as I see contains the skipped ones, but this.shardsIts does not. Maybe use a different name for the parameter? Also, isn't it iterators.size()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To think about it, do we really need iterators at all - can't we just work on this.shardsIts directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a little faster to have a dedicated local variable instead of looking up the field repeatedly, that's all :) There's no other reason to have a separate local variable.

But you're right the naming is horrific :O lets fix that in a follow-up but at least use iterators.size() here :) My idea/hope was actually to get rid of this.shardIts, it's somewhat unnecessary in the grand scheme of things :)

successfulOps = new AtomicInteger(skipped);
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
// we later compute the shard index based on the natural order of the shards
// that participate in the search request. This means that this number is
Expand Down Expand Up @@ -167,11 +167,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected void notifyListShards(
SearchProgressListener progressListener,
SearchResponse.Clusters clusters,
SearchSourceBuilder sourceBuilder
SearchRequest searchRequest,
List<SearchShardIterator> allIterators
) {
final List<SearchShard> skipped = new ArrayList<>(allIterators.size() - shardsIts.size());
for (SearchShardIterator iter : allIterators) {
if (iter.skip()) {
skipped.add(new SearchShard(iter.getClusterAlias(), iter.shardId()));
}
}
var sourceBuilder = searchRequest.source();
progressListener.notifyListShards(
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
skipped,
clusters,
sourceBuilder == null || sourceBuilder.size() > 0,
timeProvider
Expand Down Expand Up @@ -215,37 +223,29 @@ public final void start() {

@Override
protected final void run() {
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
if (outstandingShards.get() == 0) {
onPhaseDone();
return;
}
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
for (int i = 0; i < shardIterators.length; i++) {
shardIndexMap.put(shardIterators[i], i);
}
if (shardsIts.size() > 0) {
doCheckNoMissingShards(getName(), request, shardsIts);
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
assert shardIndexMap.containsKey(shardRoutings);
int shardIndex = shardIndexMap.get(shardRoutings);
final SearchShardTarget routing = shardRoutings.nextOrNull();
if (routing == null) {
failOnUnavailable(shardIndex, shardRoutings);
} else {
performPhaseOnShard(shardIndex, shardRoutings, routing);
}
doCheckNoMissingShards(getName(), request, shardsIts);
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
assert shardIndexMap.containsKey(shardRoutings);
int shardIndex = shardIndexMap.get(shardRoutings);
final SearchShardTarget routing = shardRoutings.nextOrNull();
if (routing == null) {
failOnUnavailable(shardIndex, shardRoutings);
} else {
performPhaseOnShard(shardIndex, shardRoutings, routing);
}
}
}

void skipShard(SearchShardIterator iterator) {
successfulOps.incrementAndGet();
assert iterator.skip();
successfulShardExecution();
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
if (throttleConcurrentRequests) {
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
Expand Down Expand Up @@ -343,7 +343,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
discrepancy,
successfulOps.get(),
toSkipShardsIts.size(),
skippedCount,
getNumShards(),
currentPhase
);
Expand Down Expand Up @@ -585,7 +585,7 @@ private SearchResponse buildSearchResponse(
scrollId,
getNumShards(),
numSuccess,
toSkipShardsIts.size(),
skippedCount,
buildTookInMillis(),
failures,
clusters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
this.progressListener = task.getProgressListener();
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
if (progressListener != SearchProgressListener.NOOP) {
notifyListShards(progressListener, clusters, request.source());
notifyListShards(progressListener, clusters, request, shardsIts);
}
this.client = client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh

// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
if (progressListener != SearchProgressListener.NOOP) {
notifyListShards(progressListener, clusters, request.source());
notifyListShards(progressListener, clusters, request, shardsIts);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,17 @@ public void testShardNotAvailableWithDisallowPartialFailures() {
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = new ArraySearchPhaseResults<>(numShards);
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
// skip one to avoid the "all shards failed" failure.
SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null);
skipIterator.skip(true);
action.skipShard(skipIterator);
action.onShardResult(new SearchPhaseResult() {
@Override
public int getShardIndex() {
return 0;
}

@Override
public SearchShardTarget getSearchShardTarget() {
return new SearchShardTarget(null, null, null);
}
});
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get();
assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage());
Expand Down