Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3e94dc8
Add support for `project_routing` for `_search` and `_async_search`
pawankartik-elastic Nov 4, 2025
6ac9d8c
Fix CI
pawankartik-elastic Nov 4, 2025
2d0f4c6
Fix null deref
pawankartik-elastic Nov 4, 2025
c09e00a
Copy project routing during init
pawankartik-elastic Nov 4, 2025
deb7b49
Copy project routing during init for async search too
pawankartik-elastic Nov 4, 2025
692c5a2
resolvesCps -> cpsEnabled
pawankartik-elastic Nov 4, 2025
bad85a6
Fix misc
pawankartik-elastic Nov 4, 2025
922cb53
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 4, 2025
97c184b
Update docs/changelog/137566.yaml
pawankartik-elastic Nov 5, 2025
121d0f6
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 5, 2025
8b14d89
Address review comments
pawankartik-elastic Nov 7, 2025
65a978c
Fix CI
pawankartik-elastic Nov 7, 2025
301d2a0
Drop unnecessary set
pawankartik-elastic Nov 7, 2025
a3dc375
Null check
pawankartik-elastic Nov 7, 2025
0c7ee5d
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 7, 2025
888ed14
Address review comments
pawankartik-elastic Nov 10, 2025
e0681d3
Fix license header
pawankartik-elastic Nov 10, 2025
58f66d4
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 13, 2025
69155f8
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 13, 2025
2794828
Merge remote-tracking branch 'origin/main' into pkar/search-project-r…
pawankartik-elastic Nov 13, 2025
7e8e30f
Update transport version
pawankartik-elastic Nov 13, 2025
b1e06ef
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 13, 2025
52f5e32
Merge `main` and refresh transport version
pawankartik-elastic Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137566.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137566
summary: Add support for `project_routing` for `_search` and `_async_search`
area: CCS
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest
* enabling synthetic source natively in the index.
*/
private boolean forceSyntheticSource = false;
private String projectRouting;

public SearchRequest() {
this.localClusterAlias = null;
Expand Down Expand Up @@ -168,6 +169,19 @@ public boolean allowsCrossProject() {
return true;
}

@Override
public String getProjectRouting() {
return projectRouting;
}

public void setProjectRouting(@Nullable String projectRouting) {
if (this.projectRouting != null) {
throw new IllegalArgumentException("project_routing is already set to [" + this.projectRouting + "]");
}

this.projectRouting = projectRouting;
}

/**
* Creates a new sub-search request starting from the original search request that is provided.
* For internal use only, allows to fork a search request into multiple search requests that will be executed independently.
Expand Down Expand Up @@ -228,6 +242,7 @@ private SearchRequest(
this.waitForCheckpoints = searchRequest.waitForCheckpoints;
this.waitForCheckpointsTimeout = searchRequest.waitForCheckpointsTimeout;
this.forceSyntheticSource = searchRequest.forceSyntheticSource;
this.projectRouting = searchRequest.projectRouting;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ public void onFailure(Exception e) {
}),
forceConnectTimeoutSecs,
resolvesCrossProject,
rewritten.getResolvedIndexExpressions()
rewritten.getResolvedIndexExpressions(),
rewritten.getProjectRouting()
);
}
}
Expand Down Expand Up @@ -1065,7 +1066,8 @@ static void collectSearchShards(
ActionListener<Map<String, SearchShardsResponse>> listener,
TimeValue forceConnectTimeoutSecs,
boolean resolvesCrossProject,
ResolvedIndexExpressions originResolvedIdxExpressions
ResolvedIndexExpressions originResolvedIdxExpressions,
String projectRouting
) {
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
Expand Down Expand Up @@ -1104,7 +1106,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
// We do not use the relaxed index options here when validating indices' existence.
ElasticsearchException validationEx = CrossProjectIndexResolutionValidator.validate(
originalIdxOpts,
null,
projectRouting,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piergm - Can you help me understand why we want projectRouting to be passed into CrossProjectIndexResolutionValidator.validate? This method appears to be called only from Transport Actions, not the SecurityActionFilter and its sole use in the validate method is to help determine whether the index expression is qualified. Why would projectRouting being set mean that we have a qualified expression?

That doesn't seem right to me, but maybe I'm missing a use case here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would projectRouting being set mean that we have a qualified expression?

Consider these indices: foo,bar. Normally, they'd be considered unqualified, and in the flat world, we're fine with wherever they exist. However, if project_routing is specified, say, _alias:_origin, it means we're interested in the results from the origin. So you need to read both the indices and the routing info together.

Here's how it works, using Search as an example: the SAF uses project routing to resolve the projects that are in scope. Say, the routing is _alias:p1. All non-p1 projects are filtered out via CrossProjectRoutingResolver#resolve(), and index expressions are expanded/rewritten using these "resolved" projects. When the coordinator fans out the request, it only does it to those projects that are in scope wrt the routing info, i.e. p1 in this case. So the indices effectively belong to p1, and reading both the unqualified index expressions and the routing together, they can be treated as qualified.

originResolvedIdxExpressions,
resolvedIndexExpressions
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.search.fetch.StoredFieldsContext;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -68,15 +69,13 @@ public class RestSearchAction extends BaseRestHandler {
private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
private final Settings settings;

public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature) {
this(searchUsageHolder, clusterSupportsFeature, null);
}
private final CrossProjectModeDecider crossProjectModeDecider;

public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature, Settings settings) {
this.searchUsageHolder = searchUsageHolder;
this.clusterSupportsFeature = clusterSupportsFeature;
this.settings = settings;
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
}

@Override
Expand Down Expand Up @@ -109,10 +108,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// this might be set by old clients
request.param("min_compatible_shard_node");

final boolean crossProjectEnabled = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectEnabled) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
searchRequest.setProjectRouting(request.param("project_routing"));
}

/*
Expand Down Expand Up @@ -202,11 +200,18 @@ public static void parseSearchRequest(
searchRequest.source(new SearchSourceBuilder());
}
searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
/*
* We pass this object to the request body parser so that we can extract info such as project_routing.
* We only do it if in a Cross Project Environment, though, because outside it, such details are not
* expected and valid.
*/
SearchRequest searchRequestForParsing = crossProjectEnabled ? searchRequest : null;
if (requestContentParser != null) {
if (searchUsageHolder == null) {
searchRequest.source().parseXContent(requestContentParser, true, clusterSupportsFeature);
searchRequest.source().parseXContent(searchRequestForParsing, requestContentParser, true, clusterSupportsFeature);
} else {
searchRequest.source().parseXContent(requestContentParser, true, searchUsageHolder, clusterSupportsFeature);
searchRequest.source()
.parseXContent(searchRequestForParsing, requestContentParser, true, searchUsageHolder, clusterSupportsFeature);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField RUNTIME_MAPPINGS_FIELD = new ParseField("runtime_mappings");
public static final ParseField RETRIEVER = new ParseField("retriever");
public static final ParseField PROJECT_ROUTING = new ParseField("project_routing");

private static final boolean RANK_SUPPORTED = Booleans.parseBoolean(System.getProperty("es.search.rank_supported"), true);

Expand Down Expand Up @@ -1329,18 +1330,62 @@ private SearchSourceBuilder shallowCopy(
/**
* Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent.
*
* @param searchRequest The SearchRequest object that's representing the request we're parsing which shall receive
* the parsed info. Currently, this is non-null only if we expect project_routing to appear in
* the request body, and we allow it to appear because we're in a Cross Project Search
* environment and require this info.
* @param parser The xContent parser.
* @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
* @param searchUsageHolder holder for the search usage statistics
* @param clusterSupportsFeature used to check if certain features are available on this cluster
*/
public SearchSourceBuilder parseXContent(
SearchRequest searchRequest,
XContentParser parser,
boolean checkTrailingTokens,
SearchUsageHolder searchUsageHolder,
Predicate<NodeFeature> clusterSupportsFeature
) throws IOException {
return parseXContent(parser, checkTrailingTokens, searchUsageHolder::updateUsage, clusterSupportsFeature);
return parseXContent(searchRequest, parser, checkTrailingTokens, searchUsageHolder::updateUsage, clusterSupportsFeature);
}

/**
* Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent.
*
* @param parser The xContent parser.
* @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
* @param searchUsageHolder holder for the search usage statistics
* @param clusterSupportsFeature used to check if certain features are available on this cluster
*/
public SearchSourceBuilder parseXContent(
XContentParser parser,
boolean checkTrailingTokens,
SearchUsageHolder searchUsageHolder,
Predicate<NodeFeature> clusterSupportsFeature
) throws IOException {
return parseXContent(null, parser, checkTrailingTokens, searchUsageHolder::updateUsage, clusterSupportsFeature);
}

/**
* Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent.
* This variant does not record search features usage. Most times the variant that accepts a {@link SearchUsageHolder} and records
* usage stats into it is the one to use.
*
* @param searchRequest The SearchRequest object that's representing the request we're parsing which shall receive
* the parsed info. Currently, this is non-null only if we expect project_routing to appear in
* the request body, and we allow it to appear because we're in a Cross Project Search
* environment and require this info.
* @param parser The xContent parser.
* @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
* @param clusterSupportsFeature used to check if certain features are available on this cluster
*/
public SearchSourceBuilder parseXContent(
SearchRequest searchRequest,
XContentParser parser,
boolean checkTrailingTokens,
Predicate<NodeFeature> clusterSupportsFeature
) throws IOException {
return parseXContent(searchRequest, parser, checkTrailingTokens, s -> {}, clusterSupportsFeature);
}

/**
Expand All @@ -1357,10 +1402,11 @@ public SearchSourceBuilder parseXContent(
boolean checkTrailingTokens,
Predicate<NodeFeature> clusterSupportsFeature
) throws IOException {
return parseXContent(parser, checkTrailingTokens, s -> {}, clusterSupportsFeature);
return parseXContent(null, parser, checkTrailingTokens, s -> {}, clusterSupportsFeature);
}

private SearchSourceBuilder parseXContent(
SearchRequest searchRequest,
XContentParser parser,
boolean checkTrailingTokens,
Consumer<SearchUsage> searchUsageConsumer,
Expand All @@ -1382,7 +1428,13 @@ private SearchSourceBuilder parseXContent(
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
if (PROJECT_ROUTING.match(currentFieldName, parser.getDeprecationHandler()) && searchRequest != null) {
/*
* If project_routing was specified as a query parameter too, setProjectRouting() will throw
* an error to prevent setting twice or overwriting previously set value.
*/
searchRequest.setProjectRouting(parser.text());
} else if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
from(parser.intValue());
} else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
size(parser.intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,7 @@ public void testCollectSearchShards() throws Exception {
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1136,6 +1137,7 @@ public void testCollectSearchShards() throws Exception {
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1191,6 +1193,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1224,6 +1227,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1273,6 +1277,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
Expand All @@ -33,7 +34,7 @@ public final class RestSearchActionTests extends RestActionTestCase {

@Before
public void setUpAction() {
action = new RestSearchAction(new UsageService().getSearchUsageHolder(), nf -> false);
action = new RestSearchAction(new UsageService().getSearchUsageHolder(), nf -> false, Settings.EMPTY);
controller().registerHandler(action);
verifyingClient.setExecuteVerifier((actionType, request) -> mock(SearchResponse.class));
verifyingClient.setExecuteLocallyVerifier((actionType, request) -> mock(SearchResponse.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.search;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Collection;

public class ProjectRoutingDisallowedInNonCpsEnvIT extends ESIntegTestCase {
@Override
protected boolean addMockHttpTransport() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), AsyncSearch.class);
}

public void testDisallowProjectRouting() throws IOException {
Request createAsyncRequest = new Request("POST", "/*,*:*/" + randomFrom("_async_search", "_search"));
createAsyncRequest.setJsonEntity("""
{
"project_routing": "_alias:_origin"
}
""");

ResponseException err = expectThrows(ResponseException.class, () -> getRestClient().performRequest(createAsyncRequest));
assertThat(err.toString(), Matchers.containsString("Unknown key for a VALUE_STRING in [project_routing]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectEnabled) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
submit.getSearchRequest().setProjectRouting(request.param("project_routing"));
}

IntConsumer setSize = size -> submit.getSearchRequest().source().size(size);
Expand Down