Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3e94dc8
Add support for `project_routing` for `_search` and `_async_search`
pawankartik-elastic Nov 4, 2025
6ac9d8c
Fix CI
pawankartik-elastic Nov 4, 2025
2d0f4c6
Fix null deref
pawankartik-elastic Nov 4, 2025
c09e00a
Copy project routing during init
pawankartik-elastic Nov 4, 2025
deb7b49
Copy project routing during init for async search too
pawankartik-elastic Nov 4, 2025
692c5a2
resolvesCps -> cpsEnabled
pawankartik-elastic Nov 4, 2025
bad85a6
Fix misc
pawankartik-elastic Nov 4, 2025
922cb53
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 4, 2025
97c184b
Update docs/changelog/137566.yaml
pawankartik-elastic Nov 5, 2025
121d0f6
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 5, 2025
8b14d89
Address review comments
pawankartik-elastic Nov 7, 2025
65a978c
Fix CI
pawankartik-elastic Nov 7, 2025
301d2a0
Drop unnecessary set
pawankartik-elastic Nov 7, 2025
a3dc375
Null check
pawankartik-elastic Nov 7, 2025
0c7ee5d
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 7, 2025
888ed14
Address review comments
pawankartik-elastic Nov 10, 2025
e0681d3
Fix license header
pawankartik-elastic Nov 10, 2025
58f66d4
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 13, 2025
69155f8
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 13, 2025
2794828
Merge remote-tracking branch 'origin/main' into pkar/search-project-r…
pawankartik-elastic Nov 13, 2025
7e8e30f
Update transport version
pawankartik-elastic Nov 13, 2025
b1e06ef
Merge branch 'main' into pkar/search-project-routing
pawankartik-elastic Nov 13, 2025
52f5e32
Merge `main` and refresh transport version
pawankartik-elastic Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137566.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137566
summary: Add support for `project_routing` for `_search` and `_async_search`
area: CCS
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest
* enabling synthetic source natively in the index.
*/
private boolean forceSyntheticSource = false;
private String projectRouting;

public SearchRequest() {
this.localClusterAlias = null;
Expand Down Expand Up @@ -168,6 +169,15 @@ public boolean allowsCrossProject() {
return true;
}

@Override
public String getProjectRouting() {
return projectRouting;
}

public void setProjectRouting(@Nullable String projectRouting) {
this.projectRouting = projectRouting;
}

/**
* Creates a new sub-search request starting from the original search request that is provided.
* For internal use only, allows to fork a search request into multiple search requests that will be executed independently.
Expand Down Expand Up @@ -199,6 +209,7 @@ static SearchRequest subSearchRequest(
}
final SearchRequest request = new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce);
request.setParentTask(parentTaskId);
request.setProjectRouting(originalSearchRequest.getProjectRouting());
return request;
}

Expand Down Expand Up @@ -228,6 +239,7 @@ private SearchRequest(
this.waitForCheckpoints = searchRequest.waitForCheckpoints;
this.waitForCheckpointsTimeout = searchRequest.waitForCheckpointsTimeout;
this.forceSyntheticSource = searchRequest.forceSyntheticSource;
this.projectRouting = searchRequest.projectRouting;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ public void onFailure(Exception e) {
}),
forceConnectTimeoutSecs,
resolvesCrossProject,
rewritten.getResolvedIndexExpressions()
rewritten.getResolvedIndexExpressions(),
rewritten.getProjectRouting()
);
}
}
Expand Down Expand Up @@ -1065,7 +1066,8 @@ static void collectSearchShards(
ActionListener<Map<String, SearchShardsResponse>> listener,
TimeValue forceConnectTimeoutSecs,
boolean resolvesCrossProject,
ResolvedIndexExpressions originResolvedIdxExpressions
ResolvedIndexExpressions originResolvedIdxExpressions,
String projectRouting
) {
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
Expand Down Expand Up @@ -1104,7 +1106,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
// We do not use the relaxed index options here when validating indices' existence.
ElasticsearchException validationEx = CrossProjectIndexResolutionValidator.validate(
originalIdxOpts,
null,
projectRouting,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piergm - Can you help me understand why we want projectRouting to be passed into CrossProjectIndexResolutionValidator.validate? This method appears to be called only from Transport Actions, not the SecurityActionFilter and its sole use in the validate method is to help determine whether the index expression is qualified. Why would projectRouting being set mean that we have a qualified expression?

That doesn't seem right to me, but maybe I'm missing a use case here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would projectRouting being set mean that we have a qualified expression?

Consider these indices: foo,bar. Normally, they'd be considered unqualified, and in the flat world, we're fine with wherever they exist. However, if project_routing is specified, say, _alias:_origin, it means we're interested in the results from the origin. So you need to read both the indices and the routing info together.

Here's how it works, using Search as an example: the SAF uses project routing to resolve the projects that are in scope. Say, the routing is _alias:p1. All non-p1 projects are filtered out via CrossProjectRoutingResolver#resolve(), and index expressions are expanded/rewritten using these "resolved" projects. When the coordinator fans out the request, it only does it to those projects that are in scope wrt the routing info, i.e. p1 in this case. So the indices effectively belong to p1, and reading both the unqualified index expressions and the routing together, they can be treated as qualified.

originResolvedIdxExpressions,
resolvedIndexExpressions
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.search.fetch.StoredFieldsContext;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -68,15 +69,13 @@ public class RestSearchAction extends BaseRestHandler {
private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
private final Settings settings;

public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature) {
this(searchUsageHolder, clusterSupportsFeature, null);
}
private final CrossProjectModeDecider crossProjectModeDecider;

public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature, Settings settings) {
this.searchUsageHolder = searchUsageHolder;
this.clusterSupportsFeature = clusterSupportsFeature;
this.settings = settings;
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
}

@Override
Expand Down Expand Up @@ -109,10 +108,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// this might be set by old clients
request.param("min_compatible_shard_node");

final boolean crossProjectEnabled = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectEnabled) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
searchRequest.setProjectRouting(request.param("project_routing"));
}

/*
Expand Down Expand Up @@ -252,7 +250,7 @@ public static void parseSearchRequest(
}
searchRequest.indicesOptions(indicesOptions);

validateSearchRequest(request, searchRequest);
validateSearchRequest(request, searchRequest, crossProjectEnabled);
Copy link
Contributor Author

@pawankartik-elastic pawankartik-elastic Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other endpoints use SearchRequest, too. Should we prohibit them from using project_routing? At the moment, we only look at whether CPS is enabled or not, not if an endpoint supports CPS, so I'm wondering about those endpoints, such as Fleet's.

The query parameter is consumed in the respective Rest*Action-s, so that's fine. It's the presence of the routing parameter in the request's body that I'm thinking about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also search with PIT id should not read/consume project_routing but that's with the PIT changes to make sure it doesn't happen, so we should be good here


if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, request);
Expand Down Expand Up @@ -412,10 +410,50 @@ static void preparePointInTime(SearchRequest request, RestRequest restRequest) {
* might modify the search request to align certain parameters.
*/
public static void validateSearchRequest(RestRequest restRequest, SearchRequest searchRequest) {
validateSearchRequest(restRequest, searchRequest, false);
}

private static void validateSearchRequest(RestRequest restRequest, SearchRequest searchRequest, boolean crossProjectEnabled) {
checkRestTotalHits(restRequest, searchRequest);
checkSearchType(restRequest, searchRequest);
// ensures that the rest param is consumed
restRequest.paramAsBoolean(INCLUDE_NAMED_QUERIES_SCORE_PARAM, false);
checkProjectRouting(searchRequest, crossProjectEnabled);
}

private static void checkProjectRouting(SearchRequest searchRequest, boolean crossProjectEnabled) {
/*
* There are 2 ways of specifying project_routing:
* - as a query parameter: /_search?project_routing=..., and,
* - within the request's body.
*
* Because we do not have access to `IndicesRequest/SearchRequest` from `SearchSourceBuilder`, and, project_routing
* can be potentially specified in 2 different places, we need to explicitly check this scenario.
*/
if (searchRequest.source() == null) {
return;
}

String projectRoutingInBody = searchRequest.source().projectRouting();
// If it's null, either the query parameter is also null or it isn't. Either way, we're fine with it.
if (projectRoutingInBody != null) {
if (crossProjectEnabled == false) {
throw new IllegalArgumentException("project_routing is allowed only when CPS is enabled and the endpoint supports CPS");
}

// Query parameter was also set. This is not allowed, irrespective of the values.
if (searchRequest.getProjectRouting() != null) {
throw new IllegalArgumentException(
"project_routing is specified in both the places: as query parameter and in the request's body"
);
}

/*
* Bring forward the project_routing value so that TransportSearchAction can pick it up. Although TSA can pick it up
* from `SearchSourceBuilder`, let's use `IndicesRequest#getProjectRouting()` since that's the intended way.
*/
searchRequest.setProjectRouting(projectRoutingInBody);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField RUNTIME_MAPPINGS_FIELD = new ParseField("runtime_mappings");
public static final ParseField RETRIEVER = new ParseField("retriever");
public static final ParseField PROJECT_ROUTING = new ParseField("project_routing");

private static final boolean RANK_SUPPORTED = Booleans.parseBoolean(System.getProperty("es.search.rank_supported"), true);

Expand Down Expand Up @@ -212,6 +213,8 @@ public static HighlightBuilder highlight() {

private boolean skipInnerHits = false;

private String projectRouting;

/**
* Constructs a new search source builder.
*/
Expand Down Expand Up @@ -609,6 +612,19 @@ public TimeValue timeout() {
return timeout;
}

public String projectRouting() {
return projectRouting;
}

public SearchSourceBuilder projectRouting(String projectRouting) {
if (this.projectRouting != null) {
throw new IllegalArgumentException("project_routing is already set");
}

this.projectRouting = projectRouting;
return this;
}

/**
* An optional terminate_after to terminate the search after collecting
* <code>terminateAfter</code> documents
Expand Down Expand Up @@ -1382,7 +1398,9 @@ private SearchSourceBuilder parseXContent(
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
if (PROJECT_ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
projectRouting(parser.text());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment here for future maintainers that we are going to validate the project routing and maybe throw in validateSearchRequest if it's not a CPS context, otherwise would be strange to see it parsed ok/without errors and an error thrown afterwards IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, making the change in the next push.

} else if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
from(parser.intValue());
} else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
size(parser.intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,7 @@ public void testCollectSearchShards() throws Exception {
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1136,6 +1137,7 @@ public void testCollectSearchShards() throws Exception {
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1191,6 +1193,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1224,6 +1227,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1273,6 +1277,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
null,
false,
null,
null
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
Expand All @@ -33,7 +34,7 @@ public final class RestSearchActionTests extends RestActionTestCase {

@Before
public void setUpAction() {
action = new RestSearchAction(new UsageService().getSearchUsageHolder(), nf -> false);
action = new RestSearchAction(new UsageService().getSearchUsageHolder(), nf -> false, Settings.EMPTY);
controller().registerHandler(action);
verifyingClient.setExecuteVerifier((actionType, request) -> mock(SearchResponse.class));
verifyingClient.setExecuteLocallyVerifier((actionType, request) -> mock(SearchResponse.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectEnabled) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
submit.getSearchRequest().setProjectRouting(request.param("project_routing"));
}

IntConsumer setSize = size -> submit.getSearchRequest().source().size(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) {
request.setPreFilterShardSize(1);
request.setBatchedReduceSize(5);
request.requestCache(true);
request.setProjectRouting(source.projectRouting());
}

public SubmitAsyncSearchRequest(StreamInput in) throws IOException {
Expand Down