Skip to content

Commit f8c0f8f

Browse files
Modify SearchShards API and TransportSearchAction for CPS when MRT=false
1 parent 8889536 commit f8c0f8f

File tree

6 files changed

+138
-14
lines changed

6 files changed

+138
-14
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchShardsRequest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionRequestValidationException;
1313
import org.elasticsearch.action.IndicesRequest;
1414
import org.elasticsearch.action.LegacyActionRequest;
15+
import org.elasticsearch.action.ResolvedIndexExpressions;
1516
import org.elasticsearch.action.support.IndicesOptions;
1617
import org.elasticsearch.common.io.stream.StreamInput;
1718
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -43,6 +44,8 @@ public final class SearchShardsRequest extends LegacyActionRequest implements In
4344

4445
private final String clusterAlias;
4546

47+
private ResolvedIndexExpressions resolvedIndexExpressions;
48+
4649
public SearchShardsRequest(
4750
String[] indices,
4851
IndicesOptions indicesOptions,
@@ -179,4 +182,14 @@ public int hashCode() {
179182
result = 31 * result + Arrays.hashCode(indices);
180183
return result;
181184
}
185+
186+
@Override
187+
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
188+
this.resolvedIndexExpressions = expressions;
189+
}
190+
191+
@Override
192+
public ResolvedIndexExpressions getResolvedIndexExpressions() {
193+
return resolvedIndexExpressions;
194+
}
182195
}

server/src/main/java/org/elasticsearch/action/search/SearchShardsResponse.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
package org.elasticsearch.action.search;
1111

1212
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.action.ResolvedIndexExpressions;
1314
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
1415
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.common.io.stream.StreamInput;
1718
import org.elasticsearch.common.io.stream.StreamOutput;
1819
import org.elasticsearch.common.util.Maps;
20+
import org.elasticsearch.core.Nullable;
1921
import org.elasticsearch.index.Index;
2022
import org.elasticsearch.index.shard.ShardId;
2123
import org.elasticsearch.search.internal.AliasFilter;
@@ -35,28 +37,47 @@ public final class SearchShardsResponse extends ActionResponse {
3537
private final Collection<SearchShardsGroup> groups;
3638
private final Collection<DiscoveryNode> nodes;
3739
private final Map<String, AliasFilter> aliasFilters;
40+
private final ResolvedIndexExpressions resolvedIndexExpressions;
3841

3942
public SearchShardsResponse(
4043
Collection<SearchShardsGroup> groups,
4144
Collection<DiscoveryNode> nodes,
42-
Map<String, AliasFilter> aliasFilters
45+
Map<String, AliasFilter> aliasFilters,
46+
@Nullable ResolvedIndexExpressions resolvedIndexExpressions
4347
) {
4448
this.groups = groups;
4549
this.nodes = nodes;
4650
this.aliasFilters = aliasFilters;
51+
this.resolvedIndexExpressions = resolvedIndexExpressions;
52+
}
53+
54+
public SearchShardsResponse(
55+
Collection<SearchShardsGroup> groups,
56+
Collection<DiscoveryNode> nodes,
57+
Map<String, AliasFilter> aliasFilters
58+
) {
59+
this(groups, nodes, aliasFilters, null);
4760
}
4861

4962
public SearchShardsResponse(StreamInput in) throws IOException {
5063
this.groups = in.readCollectionAsList(SearchShardsGroup::new);
5164
this.nodes = in.readCollectionAsList(DiscoveryNode::new);
5265
this.aliasFilters = in.readMap(AliasFilter::readFrom);
66+
if (in.getTransportVersion().onOrAfter(ResolvedIndexExpressions.RESOLVED_INDEX_EXPRESSIONS)) {
67+
this.resolvedIndexExpressions = in.readOptionalWriteable(ResolvedIndexExpressions::new);
68+
} else {
69+
this.resolvedIndexExpressions = null;
70+
}
5371
}
5472

5573
@Override
5674
public void writeTo(StreamOutput out) throws IOException {
5775
out.writeCollection(groups);
5876
out.writeCollection(nodes);
5977
out.writeMap(aliasFilters, StreamOutput::writeWriteable);
78+
if (out.getTransportVersion().onOrAfter(ResolvedIndexExpressions.RESOLVED_INDEX_EXPRESSIONS)) {
79+
out.writeOptionalWriteable(resolvedIndexExpressions);
80+
}
6081
}
6182

6283
/**
@@ -114,4 +135,9 @@ static SearchShardsResponse fromLegacyResponse(ClusterSearchShardsResponse oldRe
114135
public String toString() {
115136
return "SearchShardsResponse{" + "groups=" + groups + ", nodes=" + nodes + ", aliasFilters=" + aliasFilters + '}';
116137
}
138+
139+
@Nullable
140+
public ResolvedIndexExpressions getResolvedIndexExpressions() {
141+
return resolvedIndexExpressions;
142+
}
117143
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.ElasticsearchException;
1415
import org.elasticsearch.ExceptionsHelper;
1516
import org.elasticsearch.TransportVersions;
1617
import org.elasticsearch.action.ActionListener;
@@ -20,6 +21,7 @@
2021
import org.elasticsearch.action.IndicesRequest;
2122
import org.elasticsearch.action.OriginalIndices;
2223
import org.elasticsearch.action.RemoteClusterActionType;
24+
import org.elasticsearch.action.ResolvedIndexExpressions;
2325
import org.elasticsearch.action.ResolvedIndices;
2426
import org.elasticsearch.action.ShardOperationFailedException;
2527
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
@@ -79,6 +81,8 @@
7981
import org.elasticsearch.search.aggregations.AggregationReduceContext;
8082
import org.elasticsearch.search.builder.PointInTimeBuilder;
8183
import org.elasticsearch.search.builder.SearchSourceBuilder;
84+
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
85+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
8286
import org.elasticsearch.search.internal.AliasFilter;
8387
import org.elasticsearch.search.internal.SearchContext;
8488
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -120,6 +124,7 @@
120124
import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
121125
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
122126
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
127+
import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout;
123128
import static org.elasticsearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;
124129
import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_CRITICAL_READ;
125130
import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_READ;
@@ -170,6 +175,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
170175
private final UsageService usageService;
171176
private final boolean collectCCSTelemetry;
172177
private final TimeValue forceConnectTimeoutSecs;
178+
private final CrossProjectModeDecider crossProjectModeDecider;
173179

174180
@Inject
175181
public TransportSearchAction(
@@ -218,7 +224,8 @@ public TransportSearchAction(
218224
this.searchResponseMetrics = searchResponseMetrics;
219225
this.client = client;
220226
this.usageService = usageService;
221-
forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
227+
this.forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
228+
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
222229
}
223230

224231
private Map<String, OriginalIndices> buildPerIndexOriginalIndices(
@@ -353,6 +360,7 @@ private void executeRequest(
353360
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider,
354361
boolean collectSearchTelemetry
355362
) {
363+
boolean resolvesCrossProject = crossProjectModeDecider.resolvesCrossProject(original);
356364
final long relativeStartNanos = System.nanoTime();
357365
final SearchTimeProvider timeProvider = new SearchTimeProvider(
358366
original.getOrCreateAbsoluteStartMillis(),
@@ -365,16 +373,28 @@ private void executeRequest(
365373

366374
ProjectState projectState = projectResolver.getProjectState(clusterState);
367375
final ResolvedIndices resolvedIndices;
376+
377+
/*
378+
* Irrespective of whether this is the origin project or a linked project, it's wiser to relax
379+
* the index options to prevent the index resolution APIs from throwing index not found errors.
380+
* Also, we do not replace the indices options on the SearchRequest because we'd be needing it
381+
* downstream when validating the indices from both the origin and the linked projects.
382+
*/
383+
IndicesOptions resolutionIdxOpts = crossProjectModeDecider.resolvesCrossProject(original)
384+
? indicesOptionsForCrossProjectFanout(original.indicesOptions())
385+
: original.indicesOptions();
386+
368387
if (original.pointInTimeBuilder() != null) {
369388
resolvedIndices = ResolvedIndices.resolveWithPIT(
370389
original.pointInTimeBuilder(),
371-
original.indicesOptions(),
390+
resolutionIdxOpts,
372391
projectState.metadata(),
373392
namedWriteableRegistry
374393
);
375394
} else {
376-
resolvedIndices = ResolvedIndices.resolveWithIndicesRequest(
377-
original,
395+
resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
396+
original.indices(),
397+
resolutionIdxOpts,
378398
projectState.metadata(),
379399
indexNameExpressionResolver,
380400
remoteClusterService,
@@ -589,7 +609,9 @@ public void onFailure(Exception e) {
589609
searchPhaseProvider.apply(finalDelegate)
590610
);
591611
}),
592-
forceConnectTimeoutSecs
612+
forceConnectTimeoutSecs,
613+
resolvesCrossProject,
614+
rewritten.getResolvedIndexExpressions()
593615
);
594616
}
595617
}
@@ -939,7 +961,7 @@ static SearchResponseMerger createSearchResponseMerger(
939961
* Used for ccs_minimize_roundtrips=false
940962
*/
941963
static void collectSearchShards(
942-
IndicesOptions indicesOptions,
964+
IndicesOptions originalIdxOpts,
943965
String preference,
944966
String routing,
945967
QueryBuilder query,
@@ -950,7 +972,9 @@ static void collectSearchShards(
950972
SearchTimeProvider timeProvider,
951973
TransportService transportService,
952974
ActionListener<Map<String, SearchShardsResponse>> listener,
953-
TimeValue forceConnectTimeoutSecs
975+
TimeValue forceConnectTimeoutSecs,
976+
boolean resolvesCrossProject,
977+
ResolvedIndexExpressions originResolvedIdxExpressions
954978
) {
955979
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
956980
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
@@ -976,6 +1000,27 @@ void innerOnResponse(SearchShardsResponse searchShardsResponse) {
9761000

9771001
@Override
9781002
Map<String, SearchShardsResponse> createFinalResponse() {
1003+
// TODO: Perhaps, it's wiser to check for resolvesCrossProject too.
1004+
if (originResolvedIdxExpressions != null) {
1005+
Map<String, ResolvedIndexExpressions> resolvedIndexExpressions = new HashMap<>();
1006+
for (Map.Entry<String, SearchShardsResponse> entry : searchShardsResponses.entrySet()) {
1007+
if (entry.getValue().getResolvedIndexExpressions() == null) {
1008+
throw new IllegalArgumentException(
1009+
"Failed to get resolved index expressions for cluster [" + entry.getKey() + "]"
1010+
);
1011+
}
1012+
resolvedIndexExpressions.put(entry.getKey(), entry.getValue().getResolvedIndexExpressions());
1013+
}
1014+
// We do not use the related index options here when validating indices' existence.
1015+
ElasticsearchException validationEx = CrossProjectIndexResolutionValidator.validate(
1016+
originalIdxOpts,
1017+
originResolvedIdxExpressions,
1018+
resolvedIndexExpressions
1019+
);
1020+
if (validationEx != null) {
1021+
throw validationEx;
1022+
}
1023+
}
9791024
return searchShardsResponses;
9801025
}
9811026
};
@@ -988,13 +1033,24 @@ Map<String, SearchShardsResponse> createFinalResponse() {
9881033
);
9891034

9901035
connectionListener.addListener(singleListener.delegateFailure((responseListener, connection) -> {
1036+
/*
1037+
* It may be possible that indices do not exist on the project that SearchShards API is targeting.
1038+
* In such cases, it throws an error because it calls the index resolution APIs underneath. We relax
1039+
* the index options to prevent this from happening. Also, it's fine to pass in these relaxed options
1040+
* to it because SearchShardsRequest#allowsCrossProject() returns false anyway and the index rewriting
1041+
* does not happen downstream.
1042+
*/
1043+
IndicesOptions searchShardsIdxOpts = resolvesCrossProject
1044+
? indicesOptionsForCrossProjectFanout(originalIdxOpts)
1045+
: originalIdxOpts;
1046+
9911047
final String[] indices = entry.getValue().indices();
9921048
final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
9931049
// TODO: support point-in-time
9941050
if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
9951051
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
9961052
indices,
997-
indicesOptions,
1053+
searchShardsIdxOpts,
9981054
query,
9991055
routing,
10001056
preference,
@@ -1013,7 +1069,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
10131069
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(
10141070
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
10151071
indices
1016-
).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
1072+
).indicesOptions(searchShardsIdxOpts).local(true).preference(preference).routing(routing);
10171073
transportService.sendRequest(
10181074
connection,
10191075
TransportClusterSearchShardsAction.TYPE.name(),

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,12 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
160160
CollectionUtil.timSort(shardIts);
161161
if (SearchService.canRewriteToMatchNone(searchRequest.source()) == false) {
162162
delegate.onResponse(
163-
new SearchShardsResponse(toGroups(shardIts), project.cluster().nodes().getAllNodes(), aliasFilters)
163+
new SearchShardsResponse(
164+
toGroups(shardIts),
165+
project.cluster().nodes().getAllNodes(),
166+
aliasFilters,
167+
searchShardsRequest.getResolvedIndexExpressions()
168+
)
164169
);
165170
} else {
166171
CanMatchPreFilterSearchPhase.execute(logger, searchTransportService, (clusterAlias, node) -> {
@@ -179,7 +184,12 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
179184
)
180185
.addListener(
181186
delegate.map(
182-
its -> new SearchShardsResponse(toGroups(its), project.cluster().nodes().getAllNodes(), aliasFilters)
187+
its -> new SearchShardsResponse(
188+
toGroups(its),
189+
project.cluster().nodes().getAllNodes(),
190+
aliasFilters,
191+
searchShardsRequest.getResolvedIndexExpressions()
192+
)
183193
)
184194
);
185195
}

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,8 @@ public void testCollectSearchShards() throws Exception {
11011101
timeProvider,
11021102
service,
11031103
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
1104+
null,
1105+
false,
11041106
null
11051107
);
11061108
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1131,6 +1133,8 @@ public void testCollectSearchShards() throws Exception {
11311133
timeProvider,
11321134
service,
11331135
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
1136+
null,
1137+
false,
11341138
null
11351139
);
11361140
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1184,6 +1188,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
11841188
timeProvider,
11851189
service,
11861190
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
1191+
null,
1192+
false,
11871193
null
11881194
);
11891195
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1215,6 +1221,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
12151221
timeProvider,
12161222
service,
12171223
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
1224+
null,
1225+
false,
12181226
null
12191227
);
12201228
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1262,6 +1270,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
12621270
timeProvider,
12631271
service,
12641272
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
1273+
null,
1274+
false,
12651275
null
12661276
);
12671277
awaitLatch(latch, 5, TimeUnit.SECONDS);

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
7171
}
7272
SubmitAsyncSearchRequest submit = new SubmitAsyncSearchRequest();
7373

74-
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
74+
boolean crossProjectEnabled = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
75+
if (crossProjectEnabled) {
7576
// accept but drop project_routing param until fully supported
7677
request.param("project_routing");
7778
}
@@ -82,7 +83,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
8283
// them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set.
8384
// Note that ccs_minimize_roundtrips is also set this way, which is a supported option.
8485
request.withContentOrSourceParamParserOrNull(
85-
parser -> parseSearchRequest(submit.getSearchRequest(), request, parser, clusterSupportsFeature, setSize, searchUsageHolder)
86+
parser -> parseSearchRequest(
87+
submit.getSearchRequest(),
88+
request,
89+
parser,
90+
clusterSupportsFeature,
91+
setSize,
92+
searchUsageHolder,
93+
crossProjectEnabled
94+
)
8695
);
8796

8897
if (request.hasParam("wait_for_completion_timeout")) {

0 commit comments

Comments
 (0)