From ec12300aaeecd4c59c9f3087e54928bfce51610b Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Tue, 4 Nov 2025 20:55:47 -0500 Subject: [PATCH 1/8] resolve indices when MRT is true --- .../elasticsearch/action/ResolvedIndices.java | 44 ++++++ .../action/search/TransportSearchAction.java | 126 +++++++++++++----- 2 files changed, 134 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java index 5bab04188a7a7..6e242440475cc 100644 --- a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java +++ b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java @@ -9,6 +9,7 @@ package org.elasticsearch.action; +import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -23,11 +24,14 @@ import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Container for information about results of the resolution of index expression. @@ -253,6 +257,46 @@ public static ResolvedIndices resolveWithPIT( ); } + public static ResolvedIndices resolveFromResponse( + ResolveIndexAction.Response resolveIndexAction, + IndicesOptions indicesOptions, + ProjectMetadata projectMetadata, + IndexNameExpressionResolver indexNameExpressionResolver, + long startTimeInMillis + ) { + var remoteClusterIndices = Stream.of( + resolveIndexAction.getIndices(), + resolveIndexAction.getDataStreams(), + resolveIndexAction.getAliases() + ) + .flatMap(Collection::stream) + .map(ResolveIndexAction.ResolvedIndexAbstraction::getName) + .map(RemoteClusterAware::splitIndexName) + .collect( + Collectors.groupingBy( + clusterAndIndex -> clusterAndIndex[0] != null ? clusterAndIndex[0] : RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, + Collectors.mapping(clusterAndIndex -> clusterAndIndex[1], Collectors.toList()) + ) + ) + .entrySet() + .stream() + .collect( + Collectors.toMap(Map.Entry::getKey, entry -> new OriginalIndices(entry.getValue().toArray(new String[0]), indicesOptions)) + ); + + var localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + + var concreteLocalIndices = localIndices == null + ? Index.EMPTY_ARRAY + : indexNameExpressionResolver.concreteIndices(projectMetadata, localIndices, startTimeInMillis); + + return new ResolvedIndices( + remoteClusterIndices, + localIndices, + resolveLocalIndexMetadata(concreteLocalIndices, projectMetadata, true) + ); + } + private static Map resolveLocalIndexMetadata( Index[] concreteLocalIndices, ProjectMetadata projectMetadata, diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 963ba974394ad..dc956159298e3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.stats.CCSUsage; import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry; +import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -512,43 +513,58 @@ public void onFailure(Exception e) { } else { final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); if (shouldMinimizeRoundtrips(rewritten)) { - final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null - && rewritten.source().aggregations() != null - ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) - : null; - SearchResponse.Clusters clusters = new SearchResponse.Clusters( - resolvedIndices.getLocalIndices(), - resolvedIndices.getRemoteClusterIndices(), - true, - (clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults()) - ); - if (resolvedIndices.getLocalIndices() == null) { - // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards) - task.getProgressListener() - .notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider); - } - ccsRemoteReduce( - task, - parentTaskId, - rewritten, + collectResolvedIndices( + resolvesCrossProject, + original, resolvedIndices, - clusters, - timeProvider, - aggregationReduceContextBuilder, - remoteClusterService, - threadPool, - searchResponseActionListener, - (r, l) -> executeLocalSearch( - task, - timeProvider, - r, - resolvedIndices, - projectState, - clusters, - searchPhaseProvider.apply(l) - ), - transportService, - forceConnectTimeoutSecs + resolutionIdxOpts, + projectState.metadata(), + indexNameExpressionResolver, + timeProvider.absoluteStartMillis(), + searchResponseActionListener.delegateFailureAndWrap((searchListener, replacedIndices) -> { + final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null + && rewritten.source().aggregations() != null + ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) + : null; + SearchResponse.Clusters clusters = new SearchResponse.Clusters( + replacedIndices.getLocalIndices(), + replacedIndices.getRemoteClusterIndices(), + true, + (clusterAlias) -> remoteClusterService.shouldSkipOnFailure( + clusterAlias, + rewritten.allowPartialSearchResults() + ) + ); + if (replacedIndices.getLocalIndices() == null) { + // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local + // shards) + task.getProgressListener() + .notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider); + } + ccsRemoteReduce( + task, + parentTaskId, + rewritten, + replacedIndices, + clusters, + timeProvider, + aggregationReduceContextBuilder, + remoteClusterService, + threadPool, + searchListener, + (r, l) -> executeLocalSearch( + task, + timeProvider, + r, + replacedIndices, + projectState, + clusters, + searchPhaseProvider.apply(l) + ), + transportService, + forceConnectTimeoutSecs + ); + }) ); } else { final SearchContextId searchContext = resolvedIndices.getSearchContextId(); @@ -1047,6 +1063,44 @@ static SearchResponse.Clusters reconcileProjects( return new SearchResponse.Clusters(reconciledMap, false); } + void collectResolvedIndices( + boolean resolvesCrossProject, + SearchRequest original, + ResolvedIndices originalResolvedIndices, + IndicesOptions resolutionIdxOpts, + ProjectMetadata projectMetadata, + IndexNameExpressionResolver indexNameExpressionResolver, + long startTimeInMillis, + ActionListener listener + ) { + if (resolvesCrossProject) { + final ResolveIndexAction.Request resolveIndexRequest = new ResolveIndexAction.Request( + original.indices(), + original.indicesOptions(), + null, + original.getProjectRouting() + ); + + client.execute( + ResolveIndexAction.INSTANCE, + resolveIndexRequest, + listener.delegateFailureAndWrap( + (l, r) -> l.onResponse( + ResolvedIndices.resolveFromResponse( + r, + resolutionIdxOpts, + projectMetadata, + indexNameExpressionResolver, + startTimeInMillis + ) + ) + ) + ); + } else { + listener.onResponse(originalResolvedIndices); + } + } + /** * Collect remote search shards that we need to search for potential matches. * Used for ccs_minimize_roundtrips=false From d732b404ce4aa5db081a0d6c0df67f2beecba2d2 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 13 Nov 2025 08:39:12 -0500 Subject: [PATCH 2/8] fan out within search action --- .../action/search/TransportSearchIT.java | 16 +- .../elasticsearch/action/ResolvedIndices.java | 48 ++-- .../action/search/SearchRequest.java | 5 +- .../action/search/TransportSearchAction.java | 259 +++++++++++++----- .../search/SearchPhaseControllerTests.java | 10 +- .../action/search/SearchRequestTests.java | 32 ++- 6 files changed, 274 insertions(+), 96 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java index 89f46bee4b709..2127a19a9af99 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -139,6 +139,7 @@ public void testLocalClusterAlias() throws ExecutionException, InterruptedExcept parentTaskId, new SearchRequest(), Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, "local", nowInMillis, randomBoolean() @@ -158,6 +159,7 @@ public void testLocalClusterAlias() throws ExecutionException, InterruptedExcept parentTaskId, new SearchRequest(), Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, "", nowInMillis, randomBoolean() @@ -205,6 +207,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce parentTaskId, new SearchRequest(), Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, "", 0, randomBoolean() @@ -216,6 +219,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce parentTaskId, new SearchRequest(), Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, "", 0, randomBoolean() @@ -231,6 +235,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce parentTaskId, new SearchRequest(), Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, "", 0, randomBoolean() @@ -279,7 +284,15 @@ public void testFinalReduce() throws ExecutionException, InterruptedException { { SearchRequest searchRequest = randomBoolean() ? originalRequest - : SearchRequest.subSearchRequest(taskId, originalRequest, Strings.EMPTY_ARRAY, "remote", nowInMillis, true); + : SearchRequest.subSearchRequest( + taskId, + originalRequest, + Strings.EMPTY_ARRAY, + originalRequest.indicesOptions(), + "remote", + nowInMillis, + true + ); assertResponse(client().search(searchRequest), searchResponse -> { assertEquals(2, searchResponse.getHits().getTotalHits().value()); InternalAggregations aggregations = searchResponse.getAggregations(); @@ -292,6 +305,7 @@ public void testFinalReduce() throws ExecutionException, InterruptedException { taskId, originalRequest, Strings.EMPTY_ARRAY, + originalRequest.indicesOptions(), "remote", nowInMillis, false diff --git a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java index 6e242440475cc..b180bd3100d6b 100644 --- a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java +++ b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java @@ -9,7 +9,6 @@ package org.elasticsearch.action; -import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -24,14 +23,12 @@ import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.function.Function; /** * Container for information about results of the resolution of index expression. @@ -257,34 +254,33 @@ public static ResolvedIndices resolveWithPIT( ); } - public static ResolvedIndices resolveFromResponse( - ResolveIndexAction.Response resolveIndexAction, + public static ResolvedIndices resolveWithIndexExpressions( + ResolvedIndexExpressions localExpressions, + Map remoteExpressions, IndicesOptions indicesOptions, ProjectMetadata projectMetadata, IndexNameExpressionResolver indexNameExpressionResolver, long startTimeInMillis ) { - var remoteClusterIndices = Stream.of( - resolveIndexAction.getIndices(), - resolveIndexAction.getDataStreams(), - resolveIndexAction.getAliases() - ) - .flatMap(Collection::stream) - .map(ResolveIndexAction.ResolvedIndexAbstraction::getName) - .map(RemoteClusterAware::splitIndexName) - .collect( - Collectors.groupingBy( - clusterAndIndex -> clusterAndIndex[0] != null ? clusterAndIndex[0] : RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, - Collectors.mapping(clusterAndIndex -> clusterAndIndex[1], Collectors.toList()) - ) - ) - .entrySet() - .stream() - .collect( - Collectors.toMap(Map.Entry::getKey, entry -> new OriginalIndices(entry.getValue().toArray(new String[0]), indicesOptions)) - ); + Function toIndices = resolvedIndexExpressions -> { + var indices = resolvedIndexExpressions.expressions().stream().filter(expression -> { + var resolvedExpressions = expression.localExpressions(); + var successfulResolution = resolvedExpressions + .localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS; + // if the expression is a wildcard, it will be successful even if there are no indices, so filter for no indices + var hasResolvedIndices = resolvedExpressions.indices().isEmpty() == false; + return successfulResolution && hasResolvedIndices; + }).map(ResolvedIndexExpression::original).toArray(String[]::new); + return indices.length > 0 ? new OriginalIndices(indices, indicesOptions) : null; + }; + Map remoteClusterIndices = remoteExpressions.entrySet().stream().collect(HashMap::new, (map, entry) -> { + var indices = toIndices.apply(entry.getValue()); + if (indices != null) { + map.put(entry.getKey(), indices); + } + }, Map::putAll); - var localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + var localIndices = localExpressions == null ? null : toIndices.apply(localExpressions); var concreteLocalIndices = localIndices == null ? Index.EMPTY_ARRAY diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 748dc67caae3e..694de800e54a3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -178,6 +178,7 @@ public boolean allowsCrossProject() { * @param parentTaskId the parent taskId of the original search request * @param originalSearchRequest the original search request * @param indices the indices to search against + * @param indicesOptions the indicesOptions to search with * @param clusterAlias the alias to prefix index names with in the returned search results * @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used * @param finalReduce whether the reduction should be final or not @@ -186,6 +187,7 @@ static SearchRequest subSearchRequest( TaskId parentTaskId, SearchRequest originalSearchRequest, String[] indices, + IndicesOptions indicesOptions, String clusterAlias, long absoluteStartMillis, boolean finalReduce @@ -199,6 +201,7 @@ static SearchRequest subSearchRequest( } final SearchRequest request = new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce); request.setParentTask(parentTaskId); + request.indicesOptions(indicesOptions); return request; } @@ -387,7 +390,7 @@ boolean isFinalReduce() { /** * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search - * request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], String, long, boolean)}, this method returns + * request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], IndicesOptions, String, long, boolean)}, this method returns * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. */ long getOrCreateAbsoluteStartMillis() { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index dc956159298e3..8194415633dd9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry; import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.SubscribableListener; @@ -108,6 +109,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -515,55 +517,68 @@ public void onFailure(Exception e) { if (shouldMinimizeRoundtrips(rewritten)) { collectResolvedIndices( resolvesCrossProject, - original, - resolvedIndices, + rewritten, resolutionIdxOpts, + resolvedIndices, projectState.metadata(), indexNameExpressionResolver, timeProvider.absoluteStartMillis(), searchResponseActionListener.delegateFailureAndWrap((searchListener, replacedIndices) -> { - final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null - && rewritten.source().aggregations() != null - ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) - : null; - SearchResponse.Clusters clusters = new SearchResponse.Clusters( - replacedIndices.getLocalIndices(), - replacedIndices.getRemoteClusterIndices(), - true, - (clusterAlias) -> remoteClusterService.shouldSkipOnFailure( - clusterAlias, - rewritten.allowPartialSearchResults() - ) - ); - if (replacedIndices.getLocalIndices() == null) { - // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local - // shards) - task.getProgressListener() - .notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider); - } - ccsRemoteReduce( - task, - parentTaskId, - rewritten, - replacedIndices, - clusters, - timeProvider, - aggregationReduceContextBuilder, - remoteClusterService, - threadPool, - searchListener, - (r, l) -> executeLocalSearch( + // TODO figure out if we need to short circuit if indices are resolved and now they are both empty + if (replacedIndices.getRemoteClusterIndices().isEmpty()) { + executeLocalSearch( task, timeProvider, - r, + rewritten, replacedIndices, projectState, + SearchResponse.Clusters.EMPTY, + searchPhaseProvider.apply(searchResponseActionListener) + ); + } else { + final var aggregationReduceContextBuilder = rewritten.source() != null + && rewritten.source().aggregations() != null + ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) + : null; + var clusters = new SearchResponse.Clusters( + replacedIndices.getLocalIndices(), + replacedIndices.getRemoteClusterIndices(), + true, + (clusterAlias) -> remoteClusterService.shouldSkipOnFailure( + clusterAlias, + rewritten.allowPartialSearchResults() + ) + ); + if (replacedIndices.getLocalIndices() == null) { + // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local + // shards) + task.getProgressListener() + .notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider); + } + ccsRemoteReduce( + task, + parentTaskId, + rewritten, + replacedIndices, clusters, - searchPhaseProvider.apply(l) - ), - transportService, - forceConnectTimeoutSecs - ); + timeProvider, + aggregationReduceContextBuilder, + remoteClusterService, + threadPool, + searchListener, + (r, l) -> executeLocalSearch( + task, + timeProvider, + r, + replacedIndices, + projectState, + clusters, + searchPhaseProvider.apply(l) + ), + transportService, + forceConnectTimeoutSecs + ); + } }) ); } else { @@ -804,6 +819,7 @@ static void ccsRemoteReduce( parentTaskId, searchRequest, indices.indices(), + indices.indicesOptions(), clusterAlias, timeProvider.absoluteStartMillis(), true @@ -895,6 +911,7 @@ public void onFailure(Exception e) { parentTaskId, searchRequest, indices.indices(), + indices.indicesOptions(), clusterAlias, timeProvider.absoluteStartMillis(), false @@ -949,6 +966,7 @@ public void onFailure(Exception e) { parentTaskId, searchRequest, resolvedIndices.getLocalIndices().indices(), + resolvedIndices.getLocalIndices().indicesOptions(), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.absoluteStartMillis(), false @@ -1065,42 +1083,161 @@ static SearchResponse.Clusters reconcileProjects( void collectResolvedIndices( boolean resolvesCrossProject, - SearchRequest original, - ResolvedIndices originalResolvedIndices, + SearchRequest rewritten, IndicesOptions resolutionIdxOpts, + ResolvedIndices originalResolvedIndices, ProjectMetadata projectMetadata, IndexNameExpressionResolver indexNameExpressionResolver, long startTimeInMillis, ActionListener listener ) { if (resolvesCrossProject) { - final ResolveIndexAction.Request resolveIndexRequest = new ResolveIndexAction.Request( - original.indices(), - original.indicesOptions(), - null, - original.getProjectRouting() - ); + var numProjectsToResolve = (originalResolvedIndices.getLocalIndices() == null ? 0 : 1) + originalResolvedIndices + .getRemoteClusterIndices() + .size(); + assert numProjectsToResolve > 0 : "At least one index is required to resolve cross project indices"; + assert rewritten.getResolvedIndexExpressions() != null : "ResolvedIndexExpressions must be set when cross project is enabled"; + + ActionListener>> responsesByClusterListener = listener + .delegateFailureAndWrap( + (l, responsesByCluster) -> mergeResolvedIndices( + responsesByCluster, + rewritten, + resolutionIdxOpts, + projectMetadata, + indexNameExpressionResolver, + startTimeInMillis, + l + ) + ); - client.execute( - ResolveIndexAction.INSTANCE, - resolveIndexRequest, - listener.delegateFailureAndWrap( - (l, r) -> l.onResponse( - ResolvedIndices.resolveFromResponse( - r, - resolutionIdxOpts, - projectMetadata, - indexNameExpressionResolver, - startTimeInMillis - ) + ActionListener> resolveIndexFanOutListener; + if (numProjectsToResolve > 1) { + resolveIndexFanOutListener = new GroupedActionListener<>(numProjectsToResolve, responsesByClusterListener); + } else { + resolveIndexFanOutListener = responsesByClusterListener.map(Collections::singleton); + } + + if (originalResolvedIndices.getLocalIndices() != null) { + resolveLocalIndex(rewritten, resolutionIdxOpts, originalResolvedIndices.getLocalIndices(), resolveIndexFanOutListener); + } + + originalResolvedIndices.getRemoteClusterIndices() + .forEach( + (remoteClusterName, projectIndices) -> resolveRemoteCrossProjectIndex( + rewritten, + resolutionIdxOpts, + remoteClusterName, + projectIndices, + resolveIndexFanOutListener ) - ) - ); + ); } else { listener.onResponse(originalResolvedIndices); } } + private void mergeResolvedIndices( + Collection> responsesByCluster, + SearchRequest rewritten, + IndicesOptions resolutionIdxOpts, + ProjectMetadata projectMetadata, + IndexNameExpressionResolver indexNameExpressionResolver, + long startTimeInMillis, + ActionListener listener + ) { + + Map resolvedExpressions = responsesByCluster.stream() + .collect(Collectors.toMap(Map.Entry::getKey, response -> { + var resolvedIndexExpressions = response.getValue().getResolvedIndexExpressions(); + assert resolvedIndexExpressions != null + : "remote response from cluster [" + response.getKey() + "] is missing resolved index expressions"; + return resolvedIndexExpressions; + })); + + // TODO this looks wrong, we never verify localIndexExpression because we're comparing with rewritten.getResolvedIndexExpressions() + // i guess we can leave LOCAL_CLUSTER_GROUP_KEY in there and verify it alongside the other resolved expressions? + var ex = CrossProjectIndexResolutionValidator.validate( + rewritten.indicesOptions(), + rewritten.getProjectRouting(), + rewritten.getResolvedIndexExpressions(), + resolvedExpressions + ); + if (ex != null) { + listener.onFailure(ex); + } else { + var localIndexExpression = resolvedExpressions.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + listener.onResponse( + ResolvedIndices.resolveWithIndexExpressions( + localIndexExpression, + resolvedExpressions, + resolutionIdxOpts, + projectMetadata, + indexNameExpressionResolver, + startTimeInMillis + ) + ); + } + } + + private void resolveLocalIndex( + SearchRequest rewritten, + IndicesOptions resolutionIdxOpts, + OriginalIndices localIndices, + ActionListener> listener + ) { + var resolveIndexRequest = new ResolveIndexAction.Request( + localIndices.indices(), + resolutionIdxOpts, + null, + rewritten.getProjectRouting() + ); + + client.execute( + ResolveIndexAction.INSTANCE, + resolveIndexRequest, + listener.map(response -> Map.entry(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, response)) + ); + } + + private void resolveRemoteCrossProjectIndex( + SearchRequest rewritten, + IndicesOptions resolutionIdxOpts, + String remoteClusterName, + OriginalIndices projectIndices, + ActionListener> listener + ) { + SubscribableListener connectionListener = getListenerWithOptionalTimeout( + forceConnectTimeoutSecs, + threadPool, + threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) + ); + + connectionListener.addListener( + listener.delegateFailure( + (responseListener, connection) -> transportService.sendRequest( + connection, + ResolveIndexAction.REMOTE_TYPE.name(), + new ResolveIndexAction.Request(projectIndices.indices(), resolutionIdxOpts), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + listener.map(response -> Map.entry(remoteClusterName, response)), + ResolveIndexAction.Response::new, + threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) + ) + ) + ) + ); + remoteClusterService.maybeEnsureConnectedAndGetConnection( + remoteClusterName, + shouldEstablishConnection( + forceConnectTimeoutSecs, + remoteClusterService.shouldSkipOnFailure(remoteClusterName, rewritten.allowPartialSearchResults()) + ), + connectionListener + ); + } + /** * Collect remote search shards that we need to search for potential matches. * Used for ccs_minimize_roundtrips=false diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 2db3b4f1e7ec8..8f57da5e157cb 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -586,7 +586,15 @@ private static AtomicArray generateFetchResults( private static SearchRequest randomSearchRequest() { return randomBoolean() ? new SearchRequest() - : SearchRequest.subSearchRequest(new TaskId("n", 1), new SearchRequest(), Strings.EMPTY_ARRAY, "remote", 0, randomBoolean()); + : SearchRequest.subSearchRequest( + new TaskId("n", 1), + new SearchRequest(), + Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, + "remote", + 0, + randomBoolean() + ); } public void testConsumer() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 25c4c1672e852..bfdf3d71f3463 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -64,6 +64,7 @@ protected SearchRequest createSearchRequest() throws IOException { new TaskId("node", 1), request, request.indices(), + request.indicesOptions(), randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean() @@ -74,23 +75,42 @@ public void testWithLocalReduction() { final TaskId taskId = new TaskId("n", 1); expectThrows( NullPointerException.class, - () -> SearchRequest.subSearchRequest(taskId, null, Strings.EMPTY_ARRAY, "", 0, randomBoolean()) + () -> SearchRequest.subSearchRequest( + taskId, + null, + Strings.EMPTY_ARRAY, + SearchRequest.DEFAULT_INDICES_OPTIONS, + "", + 0, + randomBoolean() + ) ); SearchRequest request = new SearchRequest(); - expectThrows(NullPointerException.class, () -> SearchRequest.subSearchRequest(taskId, request, null, "", 0, randomBoolean())); expectThrows( NullPointerException.class, - () -> SearchRequest.subSearchRequest(taskId, request, new String[] { null }, "", 0, randomBoolean()) + () -> SearchRequest.subSearchRequest(taskId, request, null, request.indicesOptions(), "", 0, randomBoolean()) + ); + expectThrows( + NullPointerException.class, + () -> SearchRequest.subSearchRequest(taskId, request, new String[] { null }, request.indicesOptions(), "", 0, randomBoolean()) ); expectThrows( NullPointerException.class, - () -> SearchRequest.subSearchRequest(taskId, request, Strings.EMPTY_ARRAY, null, 0, randomBoolean()) + () -> SearchRequest.subSearchRequest(taskId, request, Strings.EMPTY_ARRAY, request.indicesOptions(), null, 0, randomBoolean()) ); expectThrows( IllegalArgumentException.class, - () -> SearchRequest.subSearchRequest(taskId, request, Strings.EMPTY_ARRAY, "", -1, randomBoolean()) + () -> SearchRequest.subSearchRequest(taskId, request, Strings.EMPTY_ARRAY, request.indicesOptions(), "", -1, randomBoolean()) + ); + SearchRequest searchRequest = SearchRequest.subSearchRequest( + taskId, + request, + Strings.EMPTY_ARRAY, + request.indicesOptions(), + "", + 0, + randomBoolean() ); - SearchRequest searchRequest = SearchRequest.subSearchRequest(taskId, request, Strings.EMPTY_ARRAY, "", 0, randomBoolean()); assertNull(searchRequest.validate()); } From 79b7f662ff5f475c46813d8df926921bba1bfdda Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Fri, 14 Nov 2025 12:04:47 -0500 Subject: [PATCH 3/8] checkstyle lol --- .../java/org/elasticsearch/action/search/SearchRequest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 35778b7ce2606..c611462079f63 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -417,8 +417,8 @@ boolean isFinalReduce() { /** * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search - * request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], IndicesOptions, String, long, boolean)}, this method returns - * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. + * request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], IndicesOptions, String, long, boolean)}, + * this method returns the provided current time, otherwise it will return {@link System#currentTimeMillis()}. */ long getOrCreateAbsoluteStartMillis() { return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis; From 161ca5e25c11c9e0d9e7a79da9b35e22245dddb4 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Mon, 17 Nov 2025 14:25:58 -0500 Subject: [PATCH 4/8] Add unit tests --- .../org/elasticsearch/action/search/TransportSearchAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 02bf6b452925b..4ee96e50c34fe 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1147,7 +1147,6 @@ private void mergeResolvedIndices( long startTimeInMillis, ActionListener listener ) { - Map resolvedExpressions = responsesByCluster.stream() .collect(Collectors.toMap(Map.Entry::getKey, response -> { var resolvedIndexExpressions = response.getValue().getResolvedIndexExpressions(); From 95fee973f60dd00612d5655ae9be840bf07aea76 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 19 Nov 2025 10:48:27 -0500 Subject: [PATCH 5/8] move back to coordinator thread on complete --- .../action/search/TransportSearchAction.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 4ee96e50c34fe..7ab51969596d8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1186,18 +1186,21 @@ private void resolveLocalIndex( OriginalIndices localIndices, ActionListener> listener ) { - var resolveIndexRequest = new ResolveIndexAction.Request( - localIndices.indices(), - resolutionIdxOpts, - null, - rewritten.getProjectRouting() - ); + SubscribableListener.newForked(l -> { + var resolveIndexRequest = new ResolveIndexAction.Request( + localIndices.indices(), + resolutionIdxOpts, + null, + rewritten.getProjectRouting() + ); - client.execute( - ResolveIndexAction.INSTANCE, - resolveIndexRequest, - listener.map(response -> Map.entry(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, response)) - ); + client.execute(ResolveIndexAction.INSTANCE, resolveIndexRequest, l); + }) + .addListener( + listener.map(response -> Map.entry(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, response)), + threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), + threadPool.getThreadContext() + ); } private void resolveRemoteCrossProjectIndex( From bc617dac067c32b22da5d60adf428691aa795f42 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 19 Nov 2025 10:49:53 -0500 Subject: [PATCH 6/8] Revert "Add unit tests" This reverts commit 161ca5e25c11c9e0d9e7a79da9b35e22245dddb4. --- .../org/elasticsearch/action/search/TransportSearchAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 7ab51969596d8..b655503e98e89 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1147,6 +1147,7 @@ private void mergeResolvedIndices( long startTimeInMillis, ActionListener listener ) { + Map resolvedExpressions = responsesByCluster.stream() .collect(Collectors.toMap(Map.Entry::getKey, response -> { var resolvedIndexExpressions = response.getValue().getResolvedIndexExpressions(); From 497e80a59a5f9bb37610d15f387c53be98d23351 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Mon, 24 Nov 2025 12:16:33 -0500 Subject: [PATCH 7/8] remove local resolution --- .../elasticsearch/action/ResolvedIndices.java | 39 +++---- .../action/search/TransportSearchAction.java | 93 +++++----------- .../action/ResolvedIndicesTests.java | 100 ++++++++++++++++++ 3 files changed, 142 insertions(+), 90 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java index b180bd3100d6b..cfe79214aa865 100644 --- a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java +++ b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java @@ -28,7 +28,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.function.Function; /** * Container for information about results of the resolution of index expression. @@ -254,16 +253,20 @@ public static ResolvedIndices resolveWithPIT( ); } + /** + * Create a new {@link ResolvedIndices} instance from a Map of Projects to {@link ResolvedIndexExpressions}. + * This method guarantees that the resulting remote project contains at least one index resolved for the mapped + * {@link ResolvedIndexExpressions}. The resulting {@link ResolvedIndices#getRemoteClusterIndices()} will map to the original index + * expression provided in {@link ResolvedIndexExpression#original()}. + */ public static ResolvedIndices resolveWithIndexExpressions( - ResolvedIndexExpressions localExpressions, + OriginalIndices localIndices, + Map localIndexMetadata, Map remoteExpressions, - IndicesOptions indicesOptions, - ProjectMetadata projectMetadata, - IndexNameExpressionResolver indexNameExpressionResolver, - long startTimeInMillis + IndicesOptions indicesOptions ) { - Function toIndices = resolvedIndexExpressions -> { - var indices = resolvedIndexExpressions.expressions().stream().filter(expression -> { + Map remoteIndices = remoteExpressions.entrySet().stream().collect(HashMap::new, (map, entry) -> { + var indices = entry.getValue().expressions().stream().filter(expression -> { var resolvedExpressions = expression.localExpressions(); var successfulResolution = resolvedExpressions .localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS; @@ -271,26 +274,12 @@ public static ResolvedIndices resolveWithIndexExpressions( var hasResolvedIndices = resolvedExpressions.indices().isEmpty() == false; return successfulResolution && hasResolvedIndices; }).map(ResolvedIndexExpression::original).toArray(String[]::new); - return indices.length > 0 ? new OriginalIndices(indices, indicesOptions) : null; - }; - Map remoteClusterIndices = remoteExpressions.entrySet().stream().collect(HashMap::new, (map, entry) -> { - var indices = toIndices.apply(entry.getValue()); - if (indices != null) { - map.put(entry.getKey(), indices); + if (indices.length > 0) { + map.put(entry.getKey(), new OriginalIndices(indices, indicesOptions)); } }, Map::putAll); - var localIndices = localExpressions == null ? null : toIndices.apply(localExpressions); - - var concreteLocalIndices = localIndices == null - ? Index.EMPTY_ARRAY - : indexNameExpressionResolver.concreteIndices(projectMetadata, localIndices, startTimeInMillis); - - return new ResolvedIndices( - remoteClusterIndices, - localIndices, - resolveLocalIndexMetadata(concreteLocalIndices, projectMetadata, true) - ); + return new ResolvedIndices(remoteIndices, localIndices, localIndexMetadata); } private static Map resolveLocalIndexMetadata( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index b655503e98e89..261385c7f92c6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -520,11 +520,7 @@ public void onFailure(Exception e) { rewritten, resolutionIdxOpts, resolvedIndices, - projectState.metadata(), - indexNameExpressionResolver, - timeProvider.absoluteStartMillis(), searchResponseActionListener.delegateFailureAndWrap((searchListener, replacedIndices) -> { - // TODO figure out if we need to short circuit if indices are resolved and now they are both empty if (replacedIndices.getRemoteClusterIndices().isEmpty()) { executeLocalSearch( task, @@ -1082,53 +1078,45 @@ static SearchResponse.Clusters reconcileProjects( return new SearchResponse.Clusters(reconciledMap, false); } + /** + * Only used for ccs_minimize_roundtrips=true pathway + */ void collectResolvedIndices( boolean resolvesCrossProject, SearchRequest rewritten, IndicesOptions resolutionIdxOpts, ResolvedIndices originalResolvedIndices, - ProjectMetadata projectMetadata, - IndexNameExpressionResolver indexNameExpressionResolver, - long startTimeInMillis, ActionListener listener ) { if (resolvesCrossProject) { - var numProjectsToResolve = (originalResolvedIndices.getLocalIndices() == null ? 0 : 1) + originalResolvedIndices - .getRemoteClusterIndices() - .size(); + var numProjectsToResolve = originalResolvedIndices.getRemoteClusterIndices().size(); assert numProjectsToResolve > 0 : "At least one index is required to resolve cross project indices"; assert rewritten.getResolvedIndexExpressions() != null : "ResolvedIndexExpressions must be set when cross project is enabled"; - ActionListener>> responsesByClusterListener = listener + ActionListener>> responsesByProjectListener = listener .delegateFailureAndWrap( - (l, responsesByCluster) -> mergeResolvedIndices( - responsesByCluster, + (l, responsesByProject) -> mergeResolvedIndices( + originalResolvedIndices, + responsesByProject, rewritten, resolutionIdxOpts, - projectMetadata, - indexNameExpressionResolver, - startTimeInMillis, l ) ); ActionListener> resolveIndexFanOutListener; if (numProjectsToResolve > 1) { - resolveIndexFanOutListener = new GroupedActionListener<>(numProjectsToResolve, responsesByClusterListener); + resolveIndexFanOutListener = new GroupedActionListener<>(numProjectsToResolve, responsesByProjectListener); } else { - resolveIndexFanOutListener = responsesByClusterListener.map(Collections::singleton); - } - - if (originalResolvedIndices.getLocalIndices() != null) { - resolveLocalIndex(rewritten, resolutionIdxOpts, originalResolvedIndices.getLocalIndices(), resolveIndexFanOutListener); + resolveIndexFanOutListener = responsesByProjectListener.map(Collections::singleton); } originalResolvedIndices.getRemoteClusterIndices() .forEach( - (remoteClusterName, projectIndices) -> resolveRemoteCrossProjectIndex( + (projectName, projectIndices) -> resolveRemoteCrossProjectIndex( rewritten, resolutionIdxOpts, - remoteClusterName, + projectName, projectIndices, resolveIndexFanOutListener ) @@ -1138,17 +1126,17 @@ void collectResolvedIndices( } } + /** + * Only used for ccs_minimize_roundtrips=true pathway + */ private void mergeResolvedIndices( - Collection> responsesByCluster, + ResolvedIndices originalResolvedIndices, + Collection> responsesByProject, SearchRequest rewritten, IndicesOptions resolutionIdxOpts, - ProjectMetadata projectMetadata, - IndexNameExpressionResolver indexNameExpressionResolver, - long startTimeInMillis, ActionListener listener ) { - - Map resolvedExpressions = responsesByCluster.stream() + Map resolvedExpressions = responsesByProject.stream() .collect(Collectors.toMap(Map.Entry::getKey, response -> { var resolvedIndexExpressions = response.getValue().getResolvedIndexExpressions(); assert resolvedIndexExpressions != null @@ -1156,8 +1144,6 @@ private void mergeResolvedIndices( return resolvedIndexExpressions; })); - // TODO this looks wrong, we never verify localIndexExpression because we're comparing with rewritten.getResolvedIndexExpressions() - // i guess we can leave LOCAL_CLUSTER_GROUP_KEY in there and verify it alongside the other resolved expressions? var ex = CrossProjectIndexResolutionValidator.validate( rewritten.indicesOptions(), rewritten.getProjectRouting(), @@ -1167,47 +1153,24 @@ private void mergeResolvedIndices( if (ex != null) { listener.onFailure(ex); } else { - var localIndexExpression = resolvedExpressions.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); listener.onResponse( ResolvedIndices.resolveWithIndexExpressions( - localIndexExpression, + originalResolvedIndices.getLocalIndices(), + originalResolvedIndices.getConcreteLocalIndicesMetadata(), resolvedExpressions, - resolutionIdxOpts, - projectMetadata, - indexNameExpressionResolver, - startTimeInMillis + resolutionIdxOpts ) ); } } - private void resolveLocalIndex( - SearchRequest rewritten, - IndicesOptions resolutionIdxOpts, - OriginalIndices localIndices, - ActionListener> listener - ) { - SubscribableListener.newForked(l -> { - var resolveIndexRequest = new ResolveIndexAction.Request( - localIndices.indices(), - resolutionIdxOpts, - null, - rewritten.getProjectRouting() - ); - - client.execute(ResolveIndexAction.INSTANCE, resolveIndexRequest, l); - }) - .addListener( - listener.map(response -> Map.entry(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, response)), - threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), - threadPool.getThreadContext() - ); - } - + /** + * Only used for ccs_minimize_roundtrips=true pathway + */ private void resolveRemoteCrossProjectIndex( SearchRequest rewritten, IndicesOptions resolutionIdxOpts, - String remoteClusterName, + String projectName, OriginalIndices projectIndices, ActionListener> listener ) { @@ -1225,7 +1188,7 @@ private void resolveRemoteCrossProjectIndex( new ResolveIndexAction.Request(projectIndices.indices(), resolutionIdxOpts), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - listener.map(response -> Map.entry(remoteClusterName, response)), + listener.map(response -> Map.entry(projectName, response)), ResolveIndexAction.Response::new, threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) ) @@ -1233,10 +1196,10 @@ private void resolveRemoteCrossProjectIndex( ) ); remoteClusterService.maybeEnsureConnectedAndGetConnection( - remoteClusterName, + projectName, shouldEstablishConnection( forceConnectTimeoutSecs, - remoteClusterService.shouldSkipOnFailure(remoteClusterName, rewritten.allowPartialSearchResults()) + remoteClusterService.shouldSkipOnFailure(projectName, rewritten.allowPartialSearchResults()) ), connectionListener ); diff --git a/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java b/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java new file mode 100644 index 0000000000000..b26a4b9eec848 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.CONCRETE_RESOURCE_NOT_VISIBLE; +import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.CONCRETE_RESOURCE_UNAUTHORIZED; +import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.NONE; +import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; + +public class ResolvedIndicesTests extends ESTestCase { + + public void testResolveWithIndexExpressions() { + var remoteExpressions = resolvedIndexExpression("logs", SUCCESS, "logs-1", "logs-2", "logs-3"); + + var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); + + assertThat(resolvedIndices.getRemoteClusterIndices(), is(not(anEmptyMap()))); + assertThat(remoteIndices(resolvedIndices), containsInAnyOrder("logs")); + } + + public void testResolveWithIndexExpressionsWithEmptyWildcard() { + var remoteExpressions = resolvedIndexExpression("logs*", SUCCESS); + + var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); + + assertThat(resolvedIndices.getRemoteClusterIndices(), is(anEmptyMap())); + } + + public void testResolveWithIndexExpressionsWithNoMatch() { + var remoteExpressions = resolvedIndexExpression( + "logs", + randomFrom(CONCRETE_RESOURCE_NOT_VISIBLE, CONCRETE_RESOURCE_UNAUTHORIZED, NONE) + ); + + var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); + + assertThat(resolvedIndices.getRemoteClusterIndices(), is(anEmptyMap())); + } + + private static Map resolvedIndexExpression( + String original, + ResolvedIndexExpression.LocalIndexResolutionResult resolutionResult, + String... indices + ) { + return Map.of( + "remote-project", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + original, + new ResolvedIndexExpression.LocalExpressions( + indices == null ? Set.of() : Arrays.stream(indices).collect(Collectors.toSet()), + resolutionResult, + null + ), + Set.of() + ) + ) + ) + ); + } + + private static ResolvedIndices resolveWithIndexExpressions(Map remoteExpressions) { + return ResolvedIndices.resolveWithIndexExpressions( + new OriginalIndices(new String[] { "some-local-index" }, IndicesOptions.DEFAULT), + Map.of(mock(), mock()), + remoteExpressions, + IndicesOptions.DEFAULT + ); + } + + private static List remoteIndices(ResolvedIndices resolvedIndices) { + var remoteClusterIndices = resolvedIndices.getRemoteClusterIndices(); + assertThat(remoteClusterIndices, hasKey("remote-project")); + var remoteProjectIndices = remoteClusterIndices.get("remote-project"); + return Arrays.stream(remoteProjectIndices.indices()).toList(); + } +} From 5e721da2b5001e515e4137644b2389171acc0693 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Mon, 24 Nov 2025 19:10:11 -0500 Subject: [PATCH 8/8] more docs, more tests --- .../elasticsearch/action/ResolvedIndices.java | 17 +- .../action/search/TransportSearchAction.java | 10 +- .../action/ResolvedIndicesTests.java | 174 ++++++++++++++---- 3 files changed, 162 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java index cfe79214aa865..27ac5420adb48 100644 --- a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java +++ b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java @@ -254,10 +254,19 @@ public static ResolvedIndices resolveWithPIT( } /** - * Create a new {@link ResolvedIndices} instance from a Map of Projects to {@link ResolvedIndexExpressions}. - * This method guarantees that the resulting remote project contains at least one index resolved for the mapped - * {@link ResolvedIndexExpressions}. The resulting {@link ResolvedIndices#getRemoteClusterIndices()} will map to the original index - * expression provided in {@link ResolvedIndexExpression#original()}. + * Create a new {@link ResolvedIndices} instance from a Map of Projects to {@link ResolvedIndexExpressions}. This is intended to be + * used for Cross-Project Search (CPS). + * + * @param localIndices this value is set as-is in the resulting ResolvedIndices. + * @param localIndexMetadata this value is set as-is in the resulting ResolvedIndices. + * @param remoteExpressions the map of project names to {@link ResolvedIndexExpressions}. This map is used to create the + * {@link ResolvedIndices#getRemoteClusterIndices()} for the resulting ResolvedIndices. Each project keyed + * in the map is guaranteed to have at least one index for the index expression provided by the user. + * The resulting {@link ResolvedIndices#getRemoteClusterIndices()} will map to the original index expression + * provided by the user. For example, if the user requested "logs" and "project-1" resolved that to "logs-1", + * then the result will map "project-1" to "logs". We rely on the remote search request to expand "logs" back + * to "logs-1". + * @param indicesOptions this value is set as-is in the resulting ResolvedIndices. */ public static ResolvedIndices resolveWithIndexExpressions( OriginalIndices localIndices, diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 261385c7f92c6..a773340dac65f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -515,13 +515,16 @@ public void onFailure(Exception e) { } else { final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); if (shouldMinimizeRoundtrips(rewritten)) { - collectResolvedIndices( + collectRemoteResolvedIndices( resolvesCrossProject, rewritten, resolutionIdxOpts, resolvedIndices, searchResponseActionListener.delegateFailureAndWrap((searchListener, replacedIndices) -> { if (replacedIndices.getRemoteClusterIndices().isEmpty()) { + // if the original resolvedIndices had remote clusters, but the replacedIndices no longer does, then we + // treat this like a local search. If there is no local index, then executeLocalSearch will return an + // empty result. executeLocalSearch( task, timeProvider, @@ -1080,8 +1083,9 @@ static SearchResponse.Clusters reconcileProjects( /** * Only used for ccs_minimize_roundtrips=true pathway + * If Cross-Project Search (CPS) is enabled, then this method will fan out to the linked projects to resolve and validate their indices. */ - void collectResolvedIndices( + void collectRemoteResolvedIndices( boolean resolvesCrossProject, SearchRequest rewritten, IndicesOptions resolutionIdxOpts, @@ -1129,7 +1133,7 @@ void collectResolvedIndices( /** * Only used for ccs_minimize_roundtrips=true pathway */ - private void mergeResolvedIndices( + private static void mergeResolvedIndices( ResolvedIndices originalResolvedIndices, Collection> responsesByProject, SearchRequest rewritten, diff --git a/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java b/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java index b26a4b9eec848..7b60d14440b4d 100644 --- a/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java +++ b/server/src/test/java/org/elasticsearch/action/ResolvedIndicesTests.java @@ -16,12 +16,12 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.CONCRETE_RESOURCE_NOT_VISIBLE; import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.CONCRETE_RESOURCE_UNAUTHORIZED; import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.NONE; import static org.elasticsearch.action.ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS; +import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasKey; @@ -30,28 +30,77 @@ import static org.mockito.Mockito.mock; public class ResolvedIndicesTests extends ESTestCase { + private static final Set NO_REMOTE_EXPRESSIONS = Set.of(); + /** + * Given project-1 resolved to foo indices foo-1, foo-2, foo-3 + * When we create ResolvedIndices + * Then project-1 maps to foo + */ public void testResolveWithIndexExpressions() { - var remoteExpressions = resolvedIndexExpression("logs", SUCCESS, "logs-1", "logs-2", "logs-3"); + var remoteExpressions = Map.of( + "project-1", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + "foo", + new ResolvedIndexExpression.LocalExpressions(Set.of("foo-1", "foo-2", "foo-3"), SUCCESS, null), + NO_REMOTE_EXPRESSIONS + ) + ) + ) + ); var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); assertThat(resolvedIndices.getRemoteClusterIndices(), is(not(anEmptyMap()))); - assertThat(remoteIndices(resolvedIndices), containsInAnyOrder("logs")); + + assertThat(resolvedIndices.getRemoteClusterIndices(), hasKey("project-1")); + assertThat(Arrays.stream(resolvedIndices.getRemoteClusterIndices().get("project-1").indices()).toList(), containsInAnyOrder("foo")); } + /** + * Given project-1 resolved logs* to no indices + * When we create ResolvedIndices + * Then we do not include project-1 in the results + */ public void testResolveWithIndexExpressionsWithEmptyWildcard() { - var remoteExpressions = resolvedIndexExpression("logs*", SUCCESS); + var remoteExpressions = Map.of( + "project-1", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + "logs*", + new ResolvedIndexExpression.LocalExpressions(Set.of(), SUCCESS, null), + NO_REMOTE_EXPRESSIONS + ) + ) + ) + ); var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); assertThat(resolvedIndices.getRemoteClusterIndices(), is(anEmptyMap())); } + /** + * Given project-1 resolved logs to no indices + * When we create ResolvedIndices + * Then we do not include project-1 in the results + */ public void testResolveWithIndexExpressionsWithNoMatch() { - var remoteExpressions = resolvedIndexExpression( - "logs", - randomFrom(CONCRETE_RESOURCE_NOT_VISIBLE, CONCRETE_RESOURCE_UNAUTHORIZED, NONE) + var localExpressionNotFoundReason = randomFrom(CONCRETE_RESOURCE_NOT_VISIBLE, CONCRETE_RESOURCE_UNAUTHORIZED, NONE); + var remoteExpressions = Map.of( + "project-1", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + "logs", + new ResolvedIndexExpression.LocalExpressions(Set.of(), localExpressionNotFoundReason, null), + NO_REMOTE_EXPRESSIONS + ) + ) + ) ); var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); @@ -59,42 +108,103 @@ public void testResolveWithIndexExpressionsWithNoMatch() { assertThat(resolvedIndices.getRemoteClusterIndices(), is(anEmptyMap())); } - private static Map resolvedIndexExpression( - String original, - ResolvedIndexExpression.LocalIndexResolutionResult resolutionResult, - String... indices - ) { - return Map.of( - "remote-project", - new ResolvedIndexExpressions( - List.of( - new ResolvedIndexExpression( - original, - new ResolvedIndexExpression.LocalExpressions( - indices == null ? Set.of() : Arrays.stream(indices).collect(Collectors.toSet()), - resolutionResult, - null + /** + * Given project-1 resolved foo to foo-1, bar to no indices, and blah to no indices + * And project-2 resolved foo to no indices, bar to bar-1, and blah to no indices + * And project-3 resolved foo to no indices, bar to no indices, and blah to no indices + * When we create ResolvedIndices + * Then project-1 maps to foo + * And project-2 maps to bar + * And we do not include project-3 in the results + */ + public void testResolveWithIndexExpressionsWithMultipleProjects() { + var remoteExpressions = Map.ofEntries( + Map.entry( + "project-1", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + "foo", + new ResolvedIndexExpression.LocalExpressions(Set.of("foo-1"), SUCCESS, null), + NO_REMOTE_EXPRESSIONS + ), + new ResolvedIndexExpression( + "bar", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS + ), + new ResolvedIndexExpression( + "blah", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS + ) + ) + ) + ), + Map.entry( + "project-2", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + "foo", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS + ), + new ResolvedIndexExpression( + "bar", + new ResolvedIndexExpression.LocalExpressions(Set.of("bar-1"), SUCCESS, null), + NO_REMOTE_EXPRESSIONS + ), + new ResolvedIndexExpression( + "blah", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS + ) + ) + ) + ), + Map.entry( + "project-3", + new ResolvedIndexExpressions( + List.of( + new ResolvedIndexExpression( + "foo", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS + ), + new ResolvedIndexExpression( + "bar", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS ), - Set.of() + new ResolvedIndexExpression( + "blah", + new ResolvedIndexExpression.LocalExpressions(Set.of(), NONE, null), + NO_REMOTE_EXPRESSIONS + ) ) ) ) ); + + var resolvedIndices = resolveWithIndexExpressions(remoteExpressions); + + assertThat(resolvedIndices.getRemoteClusterIndices(), is(not(anEmptyMap()))); + + assertThat(resolvedIndices.getRemoteClusterIndices(), hasKey("project-1")); + assertThat(resolvedIndices.getRemoteClusterIndices(), hasKey("project-2")); + assertThat(resolvedIndices.getRemoteClusterIndices(), not(hasKey("project-3"))); + assertThat(Arrays.stream(resolvedIndices.getRemoteClusterIndices().get("project-1").indices()).toList(), containsInAnyOrder("foo")); + assertThat(Arrays.stream(resolvedIndices.getRemoteClusterIndices().get("project-2").indices()).toList(), containsInAnyOrder("bar")); } private static ResolvedIndices resolveWithIndexExpressions(Map remoteExpressions) { + var cpsIndicesOptions = indicesOptionsForCrossProjectFanout(IndicesOptions.DEFAULT); return ResolvedIndices.resolveWithIndexExpressions( - new OriginalIndices(new String[] { "some-local-index" }, IndicesOptions.DEFAULT), + new OriginalIndices(new String[] { "some-local-index" }, cpsIndicesOptions), Map.of(mock(), mock()), remoteExpressions, - IndicesOptions.DEFAULT + cpsIndicesOptions ); } - - private static List remoteIndices(ResolvedIndices resolvedIndices) { - var remoteClusterIndices = resolvedIndices.getRemoteClusterIndices(); - assertThat(remoteClusterIndices, hasKey("remote-project")); - var remoteProjectIndices = remoteClusterIndices.get("remote-project"); - return Arrays.stream(remoteProjectIndices.indices()).toList(); - } }