Skip to content

Commit 94d3cfc

Browse files
committed
Mirror upstream elastic#137566 as single snapshot commit for AI review
BASE=7a23516cce48dcd78aed0075a398b604531f1e81 HEAD=121d0f632c058df1d808a022c58eaf2303e8c246 Branch=main
1 parent 7a23516 commit 94d3cfc

File tree

9 files changed

+96
-15
lines changed

9 files changed

+96
-15
lines changed

docs/changelog/137566.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137566
2+
summary: Add support for `project_routing` for `_search` and `_async_search`
3+
area: CCS
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest
116116
* enabling synthetic source natively in the index.
117117
*/
118118
private boolean forceSyntheticSource = false;
119+
private String projectRouting;
119120

120121
public SearchRequest() {
121122
this.localClusterAlias = null;
@@ -168,6 +169,15 @@ public boolean allowsCrossProject() {
168169
return true;
169170
}
170171

172+
@Override
173+
public String getProjectRouting() {
174+
return projectRouting;
175+
}
176+
177+
public void setProjectRouting(@Nullable String projectRouting) {
178+
this.projectRouting = projectRouting;
179+
}
180+
171181
/**
172182
* Creates a new sub-search request starting from the original search request that is provided.
173183
* For internal use only, allows to fork a search request into multiple search requests that will be executed independently.
@@ -199,6 +209,7 @@ static SearchRequest subSearchRequest(
199209
}
200210
final SearchRequest request = new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce);
201211
request.setParentTask(parentTaskId);
212+
request.setProjectRouting(originalSearchRequest.getProjectRouting());
202213
return request;
203214
}
204215

@@ -228,6 +239,7 @@ private SearchRequest(
228239
this.waitForCheckpoints = searchRequest.waitForCheckpoints;
229240
this.waitForCheckpointsTimeout = searchRequest.waitForCheckpointsTimeout;
230241
this.forceSyntheticSource = searchRequest.forceSyntheticSource;
242+
this.projectRouting = searchRequest.projectRouting;
231243
}
232244

233245
/**

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,8 @@ public void onFailure(Exception e) {
620620
}),
621621
forceConnectTimeoutSecs,
622622
resolvesCrossProject,
623-
rewritten.getResolvedIndexExpressions()
623+
rewritten.getResolvedIndexExpressions(),
624+
rewritten.getProjectRouting()
624625
);
625626
}
626627
}
@@ -1065,7 +1066,8 @@ static void collectSearchShards(
10651066
ActionListener<Map<String, SearchShardsResponse>> listener,
10661067
TimeValue forceConnectTimeoutSecs,
10671068
boolean resolvesCrossProject,
1068-
ResolvedIndexExpressions originResolvedIdxExpressions
1069+
ResolvedIndexExpressions originResolvedIdxExpressions,
1070+
String projectRouting
10691071
) {
10701072
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
10711073
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
@@ -1104,7 +1106,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
11041106
// We do not use the relaxed index options here when validating indices' existence.
11051107
ElasticsearchException validationEx = CrossProjectIndexResolutionValidator.validate(
11061108
originalIdxOpts,
1107-
null,
1109+
projectRouting,
11081110
originResolvedIdxExpressions,
11091111
resolvedIndexExpressions
11101112
);

server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
3131
import org.elasticsearch.search.SearchService;
3232
import org.elasticsearch.search.builder.SearchSourceBuilder;
33+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
3334
import org.elasticsearch.search.fetch.StoredFieldsContext;
3435
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
3536
import org.elasticsearch.search.internal.SearchContext;
@@ -68,15 +69,13 @@ public class RestSearchAction extends BaseRestHandler {
6869
private final SearchUsageHolder searchUsageHolder;
6970
private final Predicate<NodeFeature> clusterSupportsFeature;
7071
private final Settings settings;
71-
72-
public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature) {
73-
this(searchUsageHolder, clusterSupportsFeature, null);
74-
}
72+
private final CrossProjectModeDecider crossProjectModeDecider;
7573

7674
public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature, Settings settings) {
7775
this.searchUsageHolder = searchUsageHolder;
7876
this.clusterSupportsFeature = clusterSupportsFeature;
7977
this.settings = settings;
78+
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
8079
}
8180

8281
@Override
@@ -109,10 +108,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
109108
// this might be set by old clients
110109
request.param("min_compatible_shard_node");
111110

112-
final boolean crossProjectEnabled = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
111+
final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
113112
if (crossProjectEnabled) {
114-
// accept but drop project_routing param until fully supported
115-
request.param("project_routing");
113+
searchRequest.setProjectRouting(request.param("project_routing"));
116114
}
117115

118116
/*
@@ -252,7 +250,7 @@ public static void parseSearchRequest(
252250
}
253251
searchRequest.indicesOptions(indicesOptions);
254252

255-
validateSearchRequest(request, searchRequest);
253+
validateSearchRequest(request, searchRequest, crossProjectEnabled);
256254

257255
if (searchRequest.pointInTimeBuilder() != null) {
258256
preparePointInTime(searchRequest, request);
@@ -412,10 +410,50 @@ static void preparePointInTime(SearchRequest request, RestRequest restRequest) {
412410
* might modify the search request to align certain parameters.
413411
*/
414412
public static void validateSearchRequest(RestRequest restRequest, SearchRequest searchRequest) {
413+
validateSearchRequest(restRequest, searchRequest, false);
414+
}
415+
416+
private static void validateSearchRequest(RestRequest restRequest, SearchRequest searchRequest, boolean crossProjectEnabled) {
415417
checkRestTotalHits(restRequest, searchRequest);
416418
checkSearchType(restRequest, searchRequest);
417419
// ensures that the rest param is consumed
418420
restRequest.paramAsBoolean(INCLUDE_NAMED_QUERIES_SCORE_PARAM, false);
421+
checkProjectRouting(searchRequest, crossProjectEnabled);
422+
}
423+
424+
private static void checkProjectRouting(SearchRequest searchRequest, boolean crossProjectEnabled) {
425+
/*
426+
* There are 2 ways of specifying project_routing:
427+
* - as a query parameter: /_search?project_routing=..., and,
428+
* - within the request's body.
429+
*
430+
* Because we do not have access to `IndicesRequest/SearchRequest` from `SearchSourceBuilder`, and, project_routing
431+
* can be potentially specified in 2 different places, we need to explicitly check this scenario.
432+
*/
433+
if (searchRequest.source() == null) {
434+
return;
435+
}
436+
437+
String projectRoutingInBody = searchRequest.source().projectRouting();
438+
// If it's null, either the query parameter is also null or it isn't. Either way, we're fine with it.
439+
if (projectRoutingInBody != null) {
440+
if (crossProjectEnabled == false) {
441+
throw new IllegalArgumentException("project_routing is allowed only when CPS is enabled and the endpoint supports CPS");
442+
}
443+
444+
// Query parameter was also set. This is not allowed, irrespective of the values.
445+
if (searchRequest.getProjectRouting() != null) {
446+
throw new IllegalArgumentException(
447+
"project_routing is specified in both the places: as query parameter and in the request's body"
448+
);
449+
}
450+
451+
/*
452+
* Bring forward the project_routing value so that TransportSearchAction can pick it up. Although TSA can pick it up
453+
* from `SearchSourceBuilder`, let's use `IndicesRequest#getProjectRouting()` since that's the intended way.
454+
*/
455+
searchRequest.setProjectRouting(projectRoutingInBody);
456+
}
419457
}
420458

421459
/**

server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
129129
public static final ParseField POINT_IN_TIME = new ParseField("pit");
130130
public static final ParseField RUNTIME_MAPPINGS_FIELD = new ParseField("runtime_mappings");
131131
public static final ParseField RETRIEVER = new ParseField("retriever");
132+
public static final ParseField PROJECT_ROUTING = new ParseField("project_routing");
132133

133134
private static final boolean RANK_SUPPORTED = Booleans.parseBoolean(System.getProperty("es.search.rank_supported"), true);
134135

@@ -212,6 +213,8 @@ public static HighlightBuilder highlight() {
212213

213214
private boolean skipInnerHits = false;
214215

216+
private String projectRouting;
217+
215218
/**
216219
* Constructs a new search source builder.
217220
*/
@@ -609,6 +612,19 @@ public TimeValue timeout() {
609612
return timeout;
610613
}
611614

615+
public String projectRouting() {
616+
return projectRouting;
617+
}
618+
619+
public SearchSourceBuilder projectRouting(String projectRouting) {
620+
if (this.projectRouting != null) {
621+
throw new IllegalArgumentException("project_routing is already set");
622+
}
623+
624+
this.projectRouting = projectRouting;
625+
return this;
626+
}
627+
612628
/**
613629
* An optional terminate_after to terminate the search after collecting
614630
* <code>terminateAfter</code> documents
@@ -1382,7 +1398,9 @@ private SearchSourceBuilder parseXContent(
13821398
if (token == XContentParser.Token.FIELD_NAME) {
13831399
currentFieldName = parser.currentName();
13841400
} else if (token.isValue()) {
1385-
if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
1401+
if (PROJECT_ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
1402+
projectRouting(parser.text());
1403+
} else if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
13861404
from(parser.intValue());
13871405
} else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
13881406
size(parser.intValue());

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,7 @@ public void testCollectSearchShards() throws Exception {
11041104
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
11051105
null,
11061106
false,
1107+
null,
11071108
null
11081109
);
11091110
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1136,6 +1137,7 @@ public void testCollectSearchShards() throws Exception {
11361137
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
11371138
null,
11381139
false,
1140+
null,
11391141
null
11401142
);
11411143
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1191,6 +1193,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
11911193
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
11921194
null,
11931195
false,
1196+
null,
11941197
null
11951198
);
11961199
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1224,6 +1227,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
12241227
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
12251228
null,
12261229
false,
1230+
null,
12271231
null
12281232
);
12291233
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -1273,6 +1277,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
12731277
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
12741278
null,
12751279
false,
1280+
null,
12761281
null
12771282
);
12781283
awaitLatch(latch, 5, TimeUnit.SECONDS);

server/src/test/java/org/elasticsearch/rest/action/search/RestSearchActionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.search.SearchRequest;
1212
import org.elasticsearch.action.search.SearchResponse;
1313
import org.elasticsearch.action.search.SearchType;
14+
import org.elasticsearch.common.settings.Settings;
1415
import org.elasticsearch.rest.RestRequest;
1516
import org.elasticsearch.search.builder.SearchSourceBuilder;
1617
import org.elasticsearch.search.suggest.SuggestBuilder;
@@ -33,7 +34,7 @@ public final class RestSearchActionTests extends RestActionTestCase {
3334

3435
@Before
3536
public void setUpAction() {
36-
action = new RestSearchAction(new UsageService().getSearchUsageHolder(), nf -> false);
37+
action = new RestSearchAction(new UsageService().getSearchUsageHolder(), nf -> false, Settings.EMPTY);
3738
controller().registerHandler(action);
3839
verifyingClient.setExecuteVerifier((actionType, request) -> mock(SearchResponse.class));
3940
verifyingClient.setExecuteLocallyVerifier((actionType, request) -> mock(SearchResponse.class));

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
7070

7171
boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
7272
if (crossProjectEnabled) {
73-
// accept but drop project_routing param until fully supported
74-
request.param("project_routing");
73+
submit.getSearchRequest().setProjectRouting(request.param("project_routing"));
7574
}
7675

7776
IntConsumer setSize = size -> submit.getSearchRequest().source().size(size);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) {
5353
request.setPreFilterShardSize(1);
5454
request.setBatchedReduceSize(5);
5555
request.requestCache(true);
56+
request.setProjectRouting(source.projectRouting());
5657
}
5758

5859
public SubmitAsyncSearchRequest(StreamInput in) throws IOException {

0 commit comments

Comments
 (0)