Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -73,7 +74,8 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
request,
null,
clusterSupportsFeature,
size -> searchRequest.source().size(size)
size -> searchRequest.source().size(size),
Optional.empty()
);

// Creates the search template request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -52,7 +53,14 @@ protected void parseInternalRequest(
SearchRequest searchRequest = internal.getSearchRequest();

try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, clusterSupportsFeature, size -> failOnSizeSpecified());
RestSearchAction.parseSearchRequest(
searchRequest,
restRequest,
parser,
clusterSupportsFeature,
size -> failOnSizeSpecified(),
Optional.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know that none of the subclasses of this class are cross-project enabled? We'll need to track those down and ensure that this Optional.empty() is valid for all. Otherwise, that arg will need to be added to the parseInternalRequest method signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Irrespective of whether they're CPS-enabled or not, Optional.empty() could be a valid argument since:

  1. Their behaviour does not change for this value, and,
  2. This value can be a placeholder to denote that an endpoint does not need any CPS-specific handling during parsing.

Passing in true or false would've marked their behaviour more explicitly. I'd prefer evaluating their CPS compatibility and updating this value in a separate PR. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I follow. If any CPS-enabled endpoints are using this AbstractBulkByQueryRestHandler class as a parent, then they must not pass in Optional.empty() because setting ccs_minimize_roundtrips is not allowed in that case. I doubt any bulk endpoints are CCS enabled, but we need to track them down to make sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two endpoints using AbstractBulkByQueryRestHandler are _update_by_query and _delete_by_query. They accept CCS patterns such as foo*,remote*:bar*. However, the remotes included are not affected. For example, /*,*:*/_delete_by_query does not affect the remotes' docs. Instead, I see weird errors. If I specify a single remote, I no longer get any errors, but the docs still don't get deleted. I'm assuming this is an oversight, and that these endpoints don't play well with Cross Cluster ops in the first place. In that case, Optional.empty() is the right value here.

Copy link
Contributor Author

@pawankartik-elastic pawankartik-elastic Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should prevent Cross Cluster ops for them like we're planning to do for Fleet searches.
Edit: I just found tickets that track this internally.

);
}

searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntConsumer;
import java.util.function.Predicate;
Expand All @@ -68,6 +69,7 @@ public class RestSearchAction extends BaseRestHandler {
private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
private final Settings settings;
private final boolean inCpsContext;

public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature) {
this(searchUsageHolder, clusterSupportsFeature, null);
Expand All @@ -77,6 +79,7 @@ public RestSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeatu
this.searchUsageHolder = searchUsageHolder;
this.clusterSupportsFeature = clusterSupportsFeature;
this.settings = settings;
this.inCpsContext = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
}

@Override
Expand Down Expand Up @@ -109,7 +112,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// this might be set by old clients
request.param("min_compatible_shard_node");

if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
if (inCpsContext) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
}
Expand All @@ -128,7 +131,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(
parser -> parseSearchRequest(searchRequest, request, parser, clusterSupportsFeature, setSize, searchUsageHolder)
parser -> parseSearchRequest(
searchRequest,
request,
parser,
clusterSupportsFeature,
setSize,
searchUsageHolder,
Optional.of(inCpsContext)
)
);

return channel -> {
Expand All @@ -146,15 +157,17 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
* parameter
* @param clusterSupportsFeature used to check if certain features are available in this cluster
* @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
* @param inCpsContext specifies if we're in CPS context. It's empty if it's not relevant.
*/
public static void parseSearchRequest(
SearchRequest searchRequest,
RestRequest request,
XContentParser requestContentParser,
Predicate<NodeFeature> clusterSupportsFeature,
IntConsumer setSize
IntConsumer setSize,
Optional<Boolean> inCpsContext
) throws IOException {
parseSearchRequest(searchRequest, request, requestContentParser, clusterSupportsFeature, setSize, null);
parseSearchRequest(searchRequest, request, requestContentParser, clusterSupportsFeature, setSize, null, inCpsContext);
}

/**
Expand All @@ -167,14 +180,16 @@ public static void parseSearchRequest(
* @param clusterSupportsFeature used to check if certain features are available in this cluster
* @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
* @param searchUsageHolder the holder of search usage stats
* @param inCpsContext specifies if we're in CPS context. It's empty if it's not relevant.
*/
public static void parseSearchRequest(
SearchRequest searchRequest,
RestRequest request,
@Nullable XContentParser requestContentParser,
Predicate<NodeFeature> clusterSupportsFeature,
IntConsumer setSize,
@Nullable SearchUsageHolder searchUsageHolder
@Nullable SearchUsageHolder searchUsageHolder,
Optional<Boolean> inCpsContext
) throws IOException {
if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
Expand Down Expand Up @@ -229,9 +244,19 @@ public static void parseSearchRequest(
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, request);
} else {
searchRequest.setCcsMinimizeRoundtrips(
request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())
);
if (inCpsContext.orElse(false)) {
// We're in CPS environment. MRT should always be true and not be settable by the user.
if (request.hasParam("ccs_minimize_roundtrips")) {
throw new IllegalArgumentException("Setting ccs_minimize_roundtrips is not supported in CPS context");
} else {
searchRequest.setCcsMinimizeRoundtrips(true);
}
} else {
// We're not in CPS environment, so parse what's in the request.
searchRequest.setCcsMinimizeRoundtrips(
request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())
);
}
}
if (request.paramAsBoolean("force_synthetic_source", false)) {
searchRequest.setForceSyntheticSource(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntConsumer;
import java.util.function.Predicate;
Expand All @@ -39,6 +40,7 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler {
private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
private final Settings settings;
private final boolean inCpsContext;

public RestSubmitAsyncSearchAction(SearchUsageHolder searchUsageHolder, Predicate<NodeFeature> clusterSupportsFeature) {
this(searchUsageHolder, clusterSupportsFeature, null);
Expand All @@ -52,6 +54,7 @@ public RestSubmitAsyncSearchAction(
this.searchUsageHolder = searchUsageHolder;
this.clusterSupportsFeature = clusterSupportsFeature;
this.settings = settings;
this.inCpsContext = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
}

@Override
Expand All @@ -71,7 +74,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}
SubmitAsyncSearchRequest submit = new SubmitAsyncSearchRequest();

if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
if (inCpsContext) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
}
Expand All @@ -82,7 +85,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
// them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set.
// Note that ccs_minimize_roundtrips is also set this way, which is a supported option.
request.withContentOrSourceParamParserOrNull(
parser -> parseSearchRequest(submit.getSearchRequest(), request, parser, clusterSupportsFeature, setSize, searchUsageHolder)
parser -> parseSearchRequest(
submit.getSearchRequest(),
request,
parser,
clusterSupportsFeature,
setSize,
searchUsageHolder,
Optional.of(inCpsContext)
)
);

if (request.hasParam("wait_for_completion_timeout")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntConsumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -70,7 +71,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser -> {
RestSearchAction.parseSearchRequest(searchRequest, request, parser, clusterSupportsFeature, setSize, searchUsageHolder);
RestSearchAction.parseSearchRequest(
searchRequest,
request,
parser,
clusterSupportsFeature,
setSize,
searchUsageHolder,
Optional.empty()
);
String[] stringWaitForCheckpoints = request.paramAsStringArray("wait_for_checkpoints", Strings.EMPTY_ARRAY);
final long[] waitForCheckpoints = new long[stringWaitForCheckpoints.length];
for (int i = 0; i < stringWaitForCheckpoints.length; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -52,7 +53,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
restRequest,
parser,
clusterSupportsFeature,
size -> searchRequest.source().size(size)
size -> searchRequest.source().size(size),
Optional.empty()
)
);
RestSearchAction.validateSearchRequest(restRequest, searchRequest);
Expand Down