Skip to content

Commit d72b8f9

Browse files
Fix reconciliation
1 parent d8bba0c commit d72b8f9

File tree

2 files changed

+94
-25
lines changed

2 files changed

+94
-25
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,13 +609,17 @@ public Clusters(StreamInput in) throws IOException {
609609
}
610610

611611
public Clusters(Map<String, Cluster> clusterInfoMap) {
612+
this(clusterInfoMap, true);
613+
}
614+
615+
public Clusters(Map<String, Cluster> clusterInfoMap, boolean ccsMinimizeRoundtrips) {
612616
assert clusterInfoMap.size() > 0 : "this constructor should not be called with an empty Cluster info map";
613617
this.total = clusterInfoMap.size();
614618
this.clusterInfo = clusterInfoMap;
615619
this.successful = getClusterStateCount(Cluster.Status.SUCCESSFUL);
616620
this.skipped = getClusterStateCount(Cluster.Status.SKIPPED);
617621
// should only be called if "details" section of fromXContent is present (for ccsMinimizeRoundtrips)
618-
this.ccsMinimizeRoundtrips = true;
622+
this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
619623
}
620624

621625
@Override

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 89 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.IndicesRequest;
2222
import org.elasticsearch.action.OriginalIndices;
2323
import org.elasticsearch.action.RemoteClusterActionType;
24+
import org.elasticsearch.action.ResolvedIndexExpression;
2425
import org.elasticsearch.action.ResolvedIndexExpressions;
2526
import org.elasticsearch.action.ResolvedIndices;
2627
import org.elasticsearch.action.ShardOperationFailedException;
@@ -62,6 +63,7 @@
6263
import org.elasticsearch.common.util.ArrayUtils;
6364
import org.elasticsearch.common.util.CollectionUtils;
6465
import org.elasticsearch.common.util.Maps;
66+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
6567
import org.elasticsearch.common.util.concurrent.CountDown;
6668
import org.elasticsearch.common.util.concurrent.EsExecutors;
6769
import org.elasticsearch.core.TimeValue;
@@ -442,12 +444,6 @@ public void onFailure(Exception e) {
442444
if (ccsCheckCompatibility) {
443445
checkCCSVersionCompatibility(rewritten);
444446
}
445-
if (rewritten.indicesOptions().resolveCrossProjectIndexExpression()) {
446-
IndicesOptions indicesOptions = IndicesOptions.builder(rewritten.indicesOptions())
447-
.crossProjectModeOptions(IndicesOptions.CrossProjectModeOptions.DEFAULT)
448-
.build();
449-
rewritten.indicesOptions(indicesOptions);
450-
}
451447

452448
final ActionListener<SearchResponse> searchResponseActionListener;
453449
if (collectSearchTelemetry) {
@@ -572,6 +568,15 @@ public void onFailure(Exception e) {
572568
timeProvider,
573569
transportService,
574570
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
571+
SearchResponse.Clusters participatingProjects = clusters;
572+
if (resolvesCrossProject && rewritten.getResolvedIndexExpressions() != null) {
573+
participatingProjects = reconcileProjects(
574+
rewritten.getResolvedIndexExpressions(),
575+
searchShardsResponses,
576+
participatingProjects
577+
);
578+
}
579+
575580
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
576581
searchShardsResponses
577582
);
@@ -605,7 +610,7 @@ public void onFailure(Exception e) {
605610
clusterNodeLookup,
606611
projectState,
607612
remoteAliasFilters,
608-
clusters,
613+
participatingProjects,
609614
searchPhaseProvider.apply(finalDelegate)
610615
);
611616
}),
@@ -956,6 +961,81 @@ static SearchResponseMerger createSearchResponseMerger(
956961
return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, aggReduceContextBuilder);
957962
}
958963

964+
/**
965+
* Outside Cross Project Search, we're sure of projects involved and their corresponding indices. However,
966+
* in CPS, it may be possible that indices can exist anywhere:
967+
* <ul>
968+
* <li>Only on the origin</li>
969+
* <li>Only on the linked project(s)</li>
970+
* <li>Both on the origin and the linked project(s), and,</li>
971+
* <li>Nowhere</li>
972+
* </ul>
973+
*
974+
* Therefore, we only need to include the details of those projects hosting our indices and participating
975+
* in the search. Otherwise, we risk unnecessarily including them in the execution metadata and marking
976+
* their statuses as "successful", potentially misleading users into believing that they returned results
977+
* and participated in the search.
978+
*
979+
* Note that this code runs after the SearchShards API's responses have been pieced back and the CPS index
980+
* validation is complete.
981+
* @param originResolvedIdxExpressions The resolution result from origin's Security Action Filter.
982+
* @param shardResponses Responses pieced back from SearchShards API.
983+
* @param projects Clusters object to build upon.
984+
* @return A new Clusters object containing only the Search-participating projects.
985+
*/
986+
static SearchResponse.Clusters reconcileProjects(
987+
ResolvedIndexExpressions originResolvedIdxExpressions,
988+
Map<String, SearchShardsResponse> shardResponses,
989+
SearchResponse.Clusters projects
990+
) {
991+
/*
992+
* We only fire a SearchShards API for a project if it needs to be searched. This can either mean that it was
993+
* part of the search due to the flatworld behaviour, or that it was targeted specifically. If it returns an
994+
* empty response, it's because the project does not host any of our specified indices.
995+
*/
996+
Set<String> linkedProjectsWithResponses = shardResponses.entrySet()
997+
.stream()
998+
.filter(ssr -> ssr.getValue().getGroups().isEmpty() == false)
999+
.map(Map.Entry::getKey)
1000+
.collect(Collectors.toSet());
1001+
1002+
// Same as we do in stateful right now.
1003+
if (linkedProjectsWithResponses.isEmpty()) {
1004+
return SearchResponse.Clusters.EMPTY;
1005+
}
1006+
1007+
boolean shouldIncludeOrigin = originResolvedIdxExpressions.expressions()
1008+
.stream()
1009+
.anyMatch(
1010+
expr -> expr.localExpressions().localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS
1011+
);
1012+
1013+
Map<String, SearchResponse.Cluster> reconciledMap = ConcurrentCollections.newConcurrentMap();
1014+
for (String project : projects.getClusterAliases()) {
1015+
SearchResponse.Cluster computedProjectInfo = projects.getCluster(project);
1016+
/*
1017+
* Selection criteria for a `project` to be included in the metadata:
1018+
* - This is the origin project, and there was a "success"ful resolution by the Security Action Filter,
1019+
* - This is a linked project with a non-empty response from SearchShards API, or,
1020+
* - There was an issue with this project, so let's carry over the failures and reporting them.
1021+
*/
1022+
boolean shouldAdd = false;
1023+
if (shouldIncludeOrigin && project.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
1024+
shouldAdd = true;
1025+
} else if (linkedProjectsWithResponses.contains(project)) {
1026+
shouldAdd = true;
1027+
} else if (computedProjectInfo.getFailures().isEmpty() == false) {
1028+
shouldAdd = true;
1029+
}
1030+
1031+
if (shouldAdd) {
1032+
reconciledMap.put(project, computedProjectInfo);
1033+
}
1034+
}
1035+
1036+
return new SearchResponse.Clusters(reconciledMap, false);
1037+
}
1038+
9591039
/**
9601040
* Collect remote search shards that we need to search for potential matches.
9611041
* Used for ccs_minimize_roundtrips=false
@@ -994,28 +1074,13 @@ static void collectSearchShards(
9941074
@Override
9951075
void innerOnResponse(SearchShardsResponse searchShardsResponse) {
9961076
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION);
997-
/*
998-
* This particular linked project returned empty shards and that's because none of the requested
999-
* indices are on it. So we need to prevent it from appearing in the metadata. In case the very
1000-
* same indices don't exist on the origin too, we use `CrossProjectIndexResolutionValidator#validate()`
1001-
* to throw an error downstream.
1002-
*
1003-
* TODO: Handle the `total` count that tracks total projects since it populates that info
1004-
* within Cluster#ctor().
1005-
*/
1006-
boolean canPurge = resolvesCrossProject && searchShardsResponse.getGroups().isEmpty();
1007-
if (canPurge) {
1008-
clusters.swapCluster(clusterAlias, (ignored1, ignored2) -> null);
1009-
} else {
1010-
ccsClusterInfoUpdate(searchShardsResponse, clusters, clusterAlias, timeProvider);
1011-
}
1077+
ccsClusterInfoUpdate(searchShardsResponse, clusters, clusterAlias, timeProvider);
10121078
searchShardsResponses.put(clusterAlias, searchShardsResponse);
10131079
}
10141080

10151081
@Override
10161082
Map<String, SearchShardsResponse> createFinalResponse() {
1017-
// TODO: Perhaps, it's wiser to check for resolvesCrossProject too.
1018-
if (originResolvedIdxExpressions != null) {
1083+
if (resolvesCrossProject && originResolvedIdxExpressions != null) {
10191084
Map<String, ResolvedIndexExpressions> resolvedIndexExpressions = new HashMap<>();
10201085
for (Map.Entry<String, SearchShardsResponse> entry : searchShardsResponses.entrySet()) {
10211086
if (entry.getValue().getResolvedIndexExpressions() == null) {

0 commit comments

Comments
 (0)