Skip to content

Commit c0cb3f5

Browse files
original-brownbeargeorgewallace
authored andcommitted
Cheaper handling of skipped shard iterators in AbstractSearchAsyncAction (elastic#124223)
No reason to blow up the size of `AbstractSearchAsyncAction` (and the code size of some methods that really only need the size of that collection) needlessly. Just keep the count, that's all we need. We can build the skipped shard list inline if need be (and do so in a cheaper way because we can build the search targets.
1 parent 4f34202 commit c0cb3f5

File tree

4 files changed

+46
-38
lines changed

4 files changed

+46
-38
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.search.SearchPhaseResult;
3434
import org.elasticsearch.search.SearchShardTarget;
3535
import org.elasticsearch.search.builder.PointInTimeBuilder;
36-
import org.elasticsearch.search.builder.SearchSourceBuilder;
3736
import org.elasticsearch.search.internal.AliasFilter;
3837
import org.elasticsearch.search.internal.SearchContext;
3938
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -88,18 +87,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
8887
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
8988
private final Object shardFailuresMutex = new Object();
9089
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
91-
private final AtomicInteger successfulOps = new AtomicInteger();
90+
private final AtomicInteger successfulOps;
9291
private final SearchTimeProvider timeProvider;
9392
private final SearchResponse.Clusters clusters;
9493

95-
protected final List<SearchShardIterator> toSkipShardsIts;
9694
protected final List<SearchShardIterator> shardsIts;
9795
private final SearchShardIterator[] shardIterators;
9896
private final AtomicInteger outstandingShards;
9997
private final int maxConcurrentRequestsPerNode;
10098
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
10199
private final boolean throttleConcurrentRequests;
102100
private final AtomicBoolean requestCancelled = new AtomicBoolean();
101+
private final int skippedCount;
103102

104103
// protected for tests
105104
protected final List<Releasable> releasables = new ArrayList<>();
@@ -125,18 +124,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
125124
) {
126125
super(name);
127126
this.namedWriteableRegistry = namedWriteableRegistry;
128-
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
129127
final List<SearchShardIterator> iterators = new ArrayList<>();
128+
int skipped = 0;
130129
for (final SearchShardIterator iterator : shardsIts) {
131130
if (iterator.skip()) {
132-
toSkipIterators.add(iterator);
131+
skipped++;
133132
} else {
134133
iterators.add(iterator);
135134
}
136135
}
137-
this.toSkipShardsIts = toSkipIterators;
136+
this.skippedCount = skipped;
138137
this.shardsIts = iterators;
139-
outstandingShards = new AtomicInteger(shardsIts.size());
138+
outstandingShards = new AtomicInteger(iterators.size());
139+
successfulOps = new AtomicInteger(skipped);
140140
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
141141
// we later compute the shard index based on the natural order of the shards
142142
// that participate in the search request. This means that this number is
@@ -167,11 +167,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
167167
protected void notifyListShards(
168168
SearchProgressListener progressListener,
169169
SearchResponse.Clusters clusters,
170-
SearchSourceBuilder sourceBuilder
170+
SearchRequest searchRequest,
171+
List<SearchShardIterator> allIterators
171172
) {
173+
final List<SearchShard> skipped = new ArrayList<>(allIterators.size() - shardsIts.size());
174+
for (SearchShardIterator iter : allIterators) {
175+
if (iter.skip()) {
176+
skipped.add(new SearchShard(iter.getClusterAlias(), iter.shardId()));
177+
}
178+
}
179+
var sourceBuilder = searchRequest.source();
172180
progressListener.notifyListShards(
173181
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
174-
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
182+
skipped,
175183
clusters,
176184
sourceBuilder == null || sourceBuilder.size() > 0,
177185
timeProvider
@@ -215,37 +223,29 @@ public final void start() {
215223

216224
@Override
217225
protected final void run() {
218-
for (final SearchShardIterator iterator : toSkipShardsIts) {
219-
assert iterator.skip();
220-
skipShard(iterator);
226+
if (outstandingShards.get() == 0) {
227+
onPhaseDone();
228+
return;
221229
}
222230
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
223231
for (int i = 0; i < shardIterators.length; i++) {
224232
shardIndexMap.put(shardIterators[i], i);
225233
}
226-
if (shardsIts.size() > 0) {
227-
doCheckNoMissingShards(getName(), request, shardsIts);
228-
for (int i = 0; i < shardsIts.size(); i++) {
229-
final SearchShardIterator shardRoutings = shardsIts.get(i);
230-
assert shardRoutings.skip() == false;
231-
assert shardIndexMap.containsKey(shardRoutings);
232-
int shardIndex = shardIndexMap.get(shardRoutings);
233-
final SearchShardTarget routing = shardRoutings.nextOrNull();
234-
if (routing == null) {
235-
failOnUnavailable(shardIndex, shardRoutings);
236-
} else {
237-
performPhaseOnShard(shardIndex, shardRoutings, routing);
238-
}
234+
doCheckNoMissingShards(getName(), request, shardsIts);
235+
for (int i = 0; i < shardsIts.size(); i++) {
236+
final SearchShardIterator shardRoutings = shardsIts.get(i);
237+
assert shardRoutings.skip() == false;
238+
assert shardIndexMap.containsKey(shardRoutings);
239+
int shardIndex = shardIndexMap.get(shardRoutings);
240+
final SearchShardTarget routing = shardRoutings.nextOrNull();
241+
if (routing == null) {
242+
failOnUnavailable(shardIndex, shardRoutings);
243+
} else {
244+
performPhaseOnShard(shardIndex, shardRoutings, routing);
239245
}
240246
}
241247
}
242248

243-
void skipShard(SearchShardIterator iterator) {
244-
successfulOps.incrementAndGet();
245-
assert iterator.skip();
246-
successfulShardExecution();
247-
}
248-
249249
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
250250
if (throttleConcurrentRequests) {
251251
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
@@ -343,7 +343,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
343343
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
344344
discrepancy,
345345
successfulOps.get(),
346-
toSkipShardsIts.size(),
346+
skippedCount,
347347
getNumShards(),
348348
currentPhase
349349
);
@@ -585,7 +585,7 @@ private SearchResponse buildSearchResponse(
585585
scrollId,
586586
getNumShards(),
587587
numSuccess,
588-
toSkipShardsIts.size(),
588+
skippedCount,
589589
buildTookInMillis(),
590590
failures,
591591
clusters,

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
7373
this.progressListener = task.getProgressListener();
7474
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
7575
if (progressListener != SearchProgressListener.NOOP) {
76-
notifyListShards(progressListener, clusters, request.source());
76+
notifyListShards(progressListener, clusters, request, shardsIts);
7777
}
7878
this.client = client;
7979
}

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
8585

8686
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
8787
if (progressListener != SearchProgressListener.NOOP) {
88-
notifyListShards(progressListener, clusters, request.source());
88+
notifyListShards(progressListener, clusters, request, shardsIts);
8989
}
9090
}
9191

server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,17 @@ public void testShardNotAvailableWithDisallowPartialFailures() {
223223
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = new ArraySearchPhaseResults<>(numShards);
224224
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
225225
// skip one to avoid the "all shards failed" failure.
226-
SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null);
227-
skipIterator.skip(true);
228-
action.skipShard(skipIterator);
226+
action.onShardResult(new SearchPhaseResult() {
227+
@Override
228+
public int getShardIndex() {
229+
return 0;
230+
}
231+
232+
@Override
233+
public SearchShardTarget getSearchShardTarget() {
234+
return new SearchShardTarget(null, null, null);
235+
}
236+
});
229237
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
230238
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get();
231239
assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage());

0 commit comments

Comments
 (0)