From 83ec88524a4a098aa82d746aa49a0881868f5f5e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 6 Mar 2025 11:33:33 +0100 Subject: [PATCH 1/3] Cheaper skip handling --- .../search/AbstractSearchAsyncAction.java | 66 +++++++++---------- .../SearchDfsQueryThenFetchAsyncAction.java | 2 +- .../SearchQueryThenFetchAsyncAction.java | 2 +- .../AbstractSearchAsyncActionTests.java | 14 +++- 4 files changed, 46 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e8693352270c2..e597350085963 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.PointInTimeBuilder; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -88,11 +87,10 @@ abstract class AbstractSearchAsyncAction exten private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); - private final AtomicInteger successfulOps = new AtomicInteger(); + private final AtomicInteger successfulOps; private final SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; - protected final List toSkipShardsIts; protected final List shardsIts; private final SearchShardIterator[] shardIterators; private final AtomicInteger outstandingShards; @@ -100,6 +98,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; private final AtomicBoolean requestCancelled = new AtomicBoolean(); + private final int skippedCount; // protected for tests protected final List releasables = new ArrayList<>(); @@ -125,18 +124,19 @@ abstract class AbstractSearchAsyncAction exten ) { super(name); this.namedWriteableRegistry = namedWriteableRegistry; - final List toSkipIterators = new ArrayList<>(); final List iterators = new ArrayList<>(); + int skipped = 0; for (final SearchShardIterator iterator : shardsIts) { if (iterator.skip()) { - toSkipIterators.add(iterator); + skipped++; } else { iterators.add(iterator); } } - this.toSkipShardsIts = toSkipIterators; + this.skippedCount = skipped; this.shardsIts = iterators; - outstandingShards = new AtomicInteger(shardsIts.size()); + outstandingShards = new AtomicInteger(shardsIts.size() - skipped); + successfulOps = new AtomicInteger(skipped); this.shardIterators = iterators.toArray(new SearchShardIterator[0]); // we later compute the shard index based on the natural order of the shards // that participate in the search request. This means that this number is @@ -167,11 +167,19 @@ abstract class AbstractSearchAsyncAction exten protected void notifyListShards( SearchProgressListener progressListener, SearchResponse.Clusters clusters, - SearchSourceBuilder sourceBuilder + SearchRequest searchRequest, + List allIterators ) { + final List skipped = new ArrayList<>(); + for (SearchShardIterator iter : allIterators) { + if (iter.skip()) { + skipped.add(iter); + } + } + var sourceBuilder = searchRequest.source(); progressListener.notifyListShards( SearchProgressListener.buildSearchShardsFromIter(this.shardsIts), - SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts), + SearchProgressListener.buildSearchShardsFromIter(skipped), clusters, sourceBuilder == null || sourceBuilder.size() > 0, timeProvider @@ -215,37 +223,29 @@ public final void start() { @Override protected final void run() { - for (final SearchShardIterator iterator : toSkipShardsIts) { - assert iterator.skip(); - skipShard(iterator); + if (outstandingShards.get() == 0) { + onPhaseDone(); + return; } final Map shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length); for (int i = 0; i < shardIterators.length; i++) { shardIndexMap.put(shardIterators[i], i); } - if (shardsIts.size() > 0) { - doCheckNoMissingShards(getName(), request, shardsIts); - for (int i = 0; i < shardsIts.size(); i++) { - final SearchShardIterator shardRoutings = shardsIts.get(i); - assert shardRoutings.skip() == false; - assert shardIndexMap.containsKey(shardRoutings); - int shardIndex = shardIndexMap.get(shardRoutings); - final SearchShardTarget routing = shardRoutings.nextOrNull(); - if (routing == null) { - failOnUnavailable(shardIndex, shardRoutings); - } else { - performPhaseOnShard(shardIndex, shardRoutings, routing); - } + doCheckNoMissingShards(getName(), request, shardsIts); + for (int i = 0; i < shardsIts.size(); i++) { + final SearchShardIterator shardRoutings = shardsIts.get(i); + assert shardRoutings.skip() == false; + assert shardIndexMap.containsKey(shardRoutings); + int shardIndex = shardIndexMap.get(shardRoutings); + final SearchShardTarget routing = shardRoutings.nextOrNull(); + if (routing == null) { + failOnUnavailable(shardIndex, shardRoutings); + } else { + performPhaseOnShard(shardIndex, shardRoutings, routing); } } } - void skipShard(SearchShardIterator iterator) { - successfulOps.incrementAndGet(); - assert iterator.skip(); - successfulShardExecution(); - } - private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { if (throttleConcurrentRequests) { var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent( @@ -343,7 +343,7 @@ protected void executeNextPhase(String currentPhase, Supplier nextP "Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})", discrepancy, successfulOps.get(), - toSkipShardsIts.size(), + skippedCount, getNumShards(), currentPhase ); @@ -585,7 +585,7 @@ private SearchResponse buildSearchResponse( scrollId, getNumShards(), numSuccess, - toSkipShardsIts.size(), + skippedCount, buildTookInMillis(), failures, clusters, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index dd97f02dd8f40..d49542313a712 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -73,7 +73,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction this.progressListener = task.getProgressListener(); // don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it if (progressListener != SearchProgressListener.NOOP) { - notifyListShards(progressListener, clusters, request.source()); + notifyListShards(progressListener, clusters, request, shardsIts); } this.client = client; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 1f595f47dc489..5149dd9246335 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -85,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction phaseResults = new ArraySearchPhaseResults<>(numShards); AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); // skip one to avoid the "all shards failed" failure. - SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null); - skipIterator.skip(true); - action.skipShard(skipIterator); + action.onShardResult(new SearchPhaseResult() { + @Override + public int getShardIndex() { + return 0; + } + + @Override + public SearchShardTarget getSearchShardTarget() { + return new SearchShardTarget(null, null, null); + } + }); assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class)); SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get(); assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage()); From 2660b776b2258243927b93a91ff5d3dcfd585729 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 6 Mar 2025 15:05:50 +0100 Subject: [PATCH 2/3] save some cycles --- .../action/search/AbstractSearchAsyncAction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e597350085963..0d0733275c452 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -170,16 +170,16 @@ protected void notifyListShards( SearchRequest searchRequest, List allIterators ) { - final List skipped = new ArrayList<>(); + final List skipped = new ArrayList<>(allIterators.size() - shardsIts.size()); for (SearchShardIterator iter : allIterators) { if (iter.skip()) { - skipped.add(iter); + skipped.add(new SearchShard(iter.getClusterAlias(), iter.shardId())); } } var sourceBuilder = searchRequest.source(); progressListener.notifyListShards( SearchProgressListener.buildSearchShardsFromIter(this.shardsIts), - SearchProgressListener.buildSearchShardsFromIter(skipped), + skipped, clusters, sourceBuilder == null || sourceBuilder.size() > 0, timeProvider From 88233b380ebc875d912949b21b529acb90907ea9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 6 Mar 2025 21:35:16 +0100 Subject: [PATCH 3/3] CR: comment --- .../elasticsearch/action/search/AbstractSearchAsyncAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 0d0733275c452..995982e2f366e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -135,7 +135,7 @@ abstract class AbstractSearchAsyncAction exten } this.skippedCount = skipped; this.shardsIts = iterators; - outstandingShards = new AtomicInteger(shardsIts.size() - skipped); + outstandingShards = new AtomicInteger(iterators.size()); successfulOps = new AtomicInteger(skipped); this.shardIterators = iterators.toArray(new SearchShardIterator[0]); // we later compute the shard index based on the natural order of the shards