diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 8e7333155d762..d10ec4755f019 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -341,6 +341,13 @@ void executeRequest( ActionListener listener, Function, SearchPhaseProvider> searchPhaseProvider ) { + final SearchSourceBuilder source = original.source(); + final boolean isExplain = source != null && source.explain() != null && source.explain(); + if (shouldOpenPIT(source)) { + executeOpenPit(task, original, listener, searchPhaseProvider, source); + return; + } + final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider( original.getOrCreateAbsoluteStartMillis(), @@ -370,187 +377,180 @@ void executeRequest( ); frozenIndexCheck(resolvedIndices); } - - ActionListener rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> { - if (ccsCheckCompatibility) { - checkCCSVersionCompatibility(rewritten); - } - - if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { - executeLocalSearch( - task, - timeProvider, - rewritten, - resolvedIndices, - projectState, - SearchResponse.Clusters.EMPTY, - searchPhaseProvider.apply(delegate) - ); - } else { - if (delegate instanceof TelemetryListener tl) { - tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size()); - if (task.isAsync()) { - tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); - } - if (original.pointInTimeBuilder() != null) { - tl.setFeature(CCSUsageTelemetry.PIT_FEATURE); - } - tl.setClient(task); - // Check if any of the index patterns are wildcard patterns - var localIndices = resolvedIndices.getLocalIndices(); - if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { - tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); - } - if (resolvedIndices.getRemoteClusterIndices() - .values() - .stream() - .anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { - tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); - } + Rewriteable.rewriteAndFetch( + original, + searchService.getRewriteContext(timeProvider::absoluteStartMillis, resolvedIndices, original.pointInTimeBuilder(), isExplain), + listener.delegateFailureAndWrap((delegate, rewritten) -> { + if (ccsCheckCompatibility) { + checkCCSVersionCompatibility(rewritten); } - final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); - if (shouldMinimizeRoundtrips(rewritten)) { - if (delegate instanceof TelemetryListener tl) { - tl.setFeature(CCSUsageTelemetry.MRT_FEATURE); - } - final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null - && rewritten.source().aggregations() != null - ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) - : null; - SearchResponse.Clusters clusters = new SearchResponse.Clusters( - resolvedIndices.getLocalIndices(), - resolvedIndices.getRemoteClusterIndices(), - true, - remoteClusterService::isSkipUnavailable - ); - if (resolvedIndices.getLocalIndices() == null) { - // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards) - task.getProgressListener() - .notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider); - } - ccsRemoteReduce( + + if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { + executeLocalSearch( task, - parentTaskId, + timeProvider, rewritten, resolvedIndices, - clusters, - timeProvider, - aggregationReduceContextBuilder, - remoteClusterService, - threadPool, - delegate, - (r, l) -> executeLocalSearch( - task, - timeProvider, - r, - resolvedIndices, - projectState, - clusters, - searchPhaseProvider.apply(l) - ) + projectState, + SearchResponse.Clusters.EMPTY, + searchPhaseProvider.apply(delegate) ); } else { - final SearchContextId searchContext = resolvedIndices.getSearchContextId(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters( - resolvedIndices.getLocalIndices(), - resolvedIndices.getRemoteClusterIndices(), - false, - remoteClusterService::isSkipUnavailable - ); + executeSearchWithRemotes(task, searchPhaseProvider, delegate, rewritten, resolvedIndices, timeProvider, projectState); + } + }) + ); + } - // TODO: pass parentTaskId - collectSearchShards( - rewritten.indicesOptions(), - rewritten.preference(), - rewritten.routing(), - rewritten.source() != null ? rewritten.source().query() : null, - Objects.requireNonNullElse(rewritten.allowPartialSearchResults(), searchService.defaultAllowPartialSearchResults()), - searchContext, - resolvedIndices.getRemoteClusterIndices(), - clusters, - timeProvider, - transportService, - delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { - final BiFunction clusterNodeLookup = getRemoteClusterNodeLookup( - searchShardsResponses - ); - final Map remoteAliasFilters; - final List remoteShardIterators; - if (searchContext != null) { - remoteAliasFilters = searchContext.aliasFilter(); - remoteShardIterators = getRemoteShardsIteratorFromPointInTime( - searchShardsResponses, - searchContext, - rewritten.pointInTimeBuilder().getKeepAlive(), - resolvedIndices.getRemoteClusterIndices() - ); - } else { - remoteAliasFilters = new HashMap<>(); - for (SearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { - remoteAliasFilters.putAll(searchShardsResponse.getAliasFilters()); - } - remoteShardIterators = getRemoteShardsIterator( - searchShardsResponses, - resolvedIndices.getRemoteClusterIndices(), - remoteAliasFilters - ); - } - executeSearch( - task, - timeProvider, - rewritten, - resolvedIndices, - remoteShardIterators, - clusterNodeLookup, - projectState, - remoteAliasFilters, - clusters, - searchPhaseProvider.apply(finalDelegate) - ); - }) - ); + private void executeOpenPit( + SearchTask task, + SearchRequest original, + ActionListener listener, + Function, SearchPhaseProvider> searchPhaseProvider, + SearchSourceBuilder source + ) { + // disabling shard reordering for request + original.setPreFilterShardSize(Integer.MAX_VALUE); + openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> { + // We set the keep alive to -1 to indicate that we don't need the pit id in the response. + // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. + source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); + var pitListener = new SearchResponseActionListener(delegate) { + @Override + public void onResponse(SearchResponse response) { + // we need to close the PIT first so we delay the release of the response to after the closing + response.incRef(); + closePIT(client, original.source().pointInTimeBuilder(), () -> ActionListener.respondAndRelease(delegate, response)); + } + + @Override + public void onFailure(Exception e) { + closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); } + }; + executeRequest(task, original, pitListener, searchPhaseProvider); + })); + } + + private void executeSearchWithRemotes( + SearchTask task, + Function, SearchPhaseProvider> searchPhaseProvider, + ActionListener delegate, + SearchRequest rewritten, + ResolvedIndices resolvedIndices, + SearchTimeProvider timeProvider, + ProjectState projectState + ) { + if (delegate instanceof TelemetryListener tl) { + tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size()); + if (task.isAsync()) { + tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); } - }); + if (rewritten.pointInTimeBuilder() != null) { + tl.setFeature(CCSUsageTelemetry.PIT_FEATURE); + } + tl.setClient(task); + // Check if any of the index patterns are wildcard patterns + var localIndices = resolvedIndices.getLocalIndices(); + if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { + tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); + } + if (resolvedIndices.getRemoteClusterIndices() + .values() + .stream() + .anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { + tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); + } + } + if (shouldMinimizeRoundtrips(rewritten)) { + if (delegate instanceof TelemetryListener tl) { + tl.setFeature(CCSUsageTelemetry.MRT_FEATURE); + } + final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null + && rewritten.source().aggregations() != null + ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) + : null; + SearchResponse.Clusters clusters = new SearchResponse.Clusters( + resolvedIndices.getLocalIndices(), + resolvedIndices.getRemoteClusterIndices(), + true, + remoteClusterService::isSkipUnavailable + ); + if (resolvedIndices.getLocalIndices() == null) { + // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local + // shards) + task.getProgressListener() + .notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider); + } + ccsRemoteReduce( + task, + task.taskInfo(clusterService.localNode().getId(), false).taskId(), + rewritten, + resolvedIndices, + clusters, + timeProvider, + aggregationReduceContextBuilder, + remoteClusterService, + threadPool, + delegate, + (r, l) -> executeLocalSearch(task, timeProvider, r, resolvedIndices, projectState, clusters, searchPhaseProvider.apply(l)) + ); + } else { + final SearchContextId searchContext = resolvedIndices.getSearchContextId(); + SearchResponse.Clusters clusters = new SearchResponse.Clusters( + resolvedIndices.getLocalIndices(), + resolvedIndices.getRemoteClusterIndices(), + false, + remoteClusterService::isSkipUnavailable + ); - final SearchSourceBuilder source = original.source(); - final boolean isExplain = source != null && source.explain() != null && source.explain(); - if (shouldOpenPIT(source)) { - // disabling shard reordering for request - original.setPreFilterShardSize(Integer.MAX_VALUE); - openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> { - // We set the keep alive to -1 to indicate that we don't need the pit id in the response. - // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. - source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); - var pitListener = new SearchResponseActionListener(delegate) { - @Override - public void onResponse(SearchResponse response) { - // we need to close the PIT first so we delay the release of the response to after the closing - response.incRef(); - closePIT( - client, - original.source().pointInTimeBuilder(), - () -> ActionListener.respondAndRelease(delegate, response) + // TODO: pass parentTaskId + collectSearchShards( + rewritten.indicesOptions(), + rewritten.preference(), + rewritten.routing(), + rewritten.source() != null ? rewritten.source().query() : null, + Objects.requireNonNullElse(rewritten.allowPartialSearchResults(), searchService.defaultAllowPartialSearchResults()), + searchContext, + resolvedIndices.getRemoteClusterIndices(), + clusters, + timeProvider, + transportService, + delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { + final Map remoteAliasFilters; + final List remoteShardIterators; + if (searchContext != null) { + remoteAliasFilters = searchContext.aliasFilter(); + remoteShardIterators = getRemoteShardsIteratorFromPointInTime( + searchShardsResponses, + searchContext, + rewritten.pointInTimeBuilder().getKeepAlive(), + resolvedIndices.getRemoteClusterIndices() + ); + } else { + remoteAliasFilters = new HashMap<>(); + for (SearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { + remoteAliasFilters.putAll(searchShardsResponse.getAliasFilters()); + } + remoteShardIterators = getRemoteShardsIterator( + searchShardsResponses, + resolvedIndices.getRemoteClusterIndices(), + remoteAliasFilters ); } - - @Override - public void onFailure(Exception e) { - closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); - } - }; - executeRequest(task, original, pitListener, searchPhaseProvider); - })); - } else { - Rewriteable.rewriteAndFetch( - original, - searchService.getRewriteContext( - timeProvider::absoluteStartMillis, - resolvedIndices, - original.pointInTimeBuilder(), - isExplain - ), - rewriteListener + executeSearch( + task, + timeProvider, + rewritten, + resolvedIndices, + remoteShardIterators, + getRemoteClusterNodeLookup(searchShardsResponses), + projectState, + remoteAliasFilters, + clusters, + searchPhaseProvider.apply(finalDelegate) + ); + }) ); } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1c60803c55242..282ac0178cdaa 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -305,7 +305,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final BigArrays bigArrays; private final FetchPhase fetchPhase; - private final CircuitBreaker circuitBreaker; private volatile Executor searchExecutor; private volatile boolean enableQueryPhaseParallelCollection; @@ -358,8 +357,11 @@ public SearchService( this.scriptService = scriptService; this.bigArrays = bigArrays; this.fetchPhase = fetchPhase; - circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); - this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker); + this.multiBucketConsumerService = new MultiBucketConsumerService( + clusterService, + settings, + circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) + ); this.executorSelector = executorSelector; this.tracer = tracer; @@ -632,19 +634,28 @@ public void executeQueryPhase(ShardSearchRequest request, CancellableTask task, ); } - private void ensureAfterSeqNoRefreshed( + private void ensureAfterSeqNoRefreshed( IndexShard shard, ShardSearchRequest request, - CheckedSupplier executable, - ActionListener listener + CheckedSupplier executable, + ActionListener listener + ) { + if (request.waitForCheckpoint() <= UNASSIGNED_SEQ_NO) { + runAsync(shard, executable, listener); + } else { + doEnsureAfterSeqNoRefreshed(shard, request, executable, listener); + } + } + + private void doEnsureAfterSeqNoRefreshed( + IndexShard shard, + ShardSearchRequest request, + CheckedSupplier executable, + ActionListener listener ) { final long waitForCheckpoint = request.waitForCheckpoint(); - final Executor executor = getExecutor(shard); + assert request.waitForCheckpoint() > UNASSIGNED_SEQ_NO : "saw unassigned sequence number [" + request.waitForCheckpoint() + "]"; try { - if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) { - runAsync(executor, executable, listener); - return; - } if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) { listener.onFailure(new IllegalArgumentException("Cannot use wait_for_checkpoints with [index.refresh_interval=-1]")); return; @@ -718,7 +729,7 @@ private void searchReady() { if (timeoutTask != null) { timeoutTask.cancel(); } - runAsync(executor, executable, listener); + runAsync(shard, executable, listener); } } }); @@ -738,12 +749,8 @@ private IndexShard getShard(ShardSearchRequest request) { return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); } - private static void runAsync( - Executor executor, - CheckedSupplier executable, - ActionListener listener - ) { - executor.execute(ActionRunnable.supplyAndDecRef(listener, executable)); + private void runAsync(IndexShard shard, CheckedSupplier executable, ActionListener listener) { + getExecutor(shard).execute(ActionRunnable.supplyAndDecRef(listener, executable)); } /** @@ -809,7 +816,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(readerContext.indexShard(), () -> { try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.RANK_FEATURE, false)) { int[] docIds = request.getDocIds(); if (docIds == null || docIds.length == 0) { @@ -866,7 +873,7 @@ public void executeQueryPhase( freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(readerContext.indexShard(), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) { var opsListener = searchContext.indexShard().getSearchOperationListener(); @@ -911,7 +918,7 @@ public void executeQueryPhase( final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> { // fork the execution in the search thread pool - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(readerContext.indexShard(), () -> { readerContext.setAggregatedDfs(request.dfs()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) { final QuerySearchResult queryResult; @@ -979,7 +986,7 @@ public void executeFetchPhase( freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(readerContext.indexShard(), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);) { var opsListener = readerContext.indexShard().getSearchOperationListener(); @@ -1016,7 +1023,7 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> { - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(readerContext.indexShard(), () -> { try (SearchContext searchContext = createContext(readerContext, rewritten, task, ResultsType.FETCH, false)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc();