diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java index c17cae1779056..447e78d9a4368 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.LegacyActionRequest; +import org.elasticsearch.action.ResolvedIndexExpressions; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; @@ -73,6 +74,7 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen private QueryBuilder indexFilter; private Map runtimeFields = Collections.emptyMap(); private Long nowInMillis; + private ResolvedIndexExpressions resolvedIndexExpressions; public FieldCapabilitiesRequest(StreamInput in) throws IOException { super(in); @@ -259,6 +261,21 @@ public boolean allowsRemoteIndices() { return true; } + @Override + public boolean allowsCrossProject() { + return true; + } + + @Override + public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) { + this.resolvedIndexExpressions = expressions; + } + + @Override + public ResolvedIndexExpressions getResolvedIndexExpressions() { + return resolvedIndexExpressions; + } + @Override public boolean includeDataStreams() { return true; diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java index 8372ad5909de1..869cae875a618 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java @@ -62,17 +62,18 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked private final List failures; private final List indexResponses; private final TransportVersion minTransportVersion; + private final ResolvedIndexExpressions resolvedIndexExpressions; public FieldCapabilitiesResponse( String[] indices, Map> fields, List failures ) { - this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), failures, null); + this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), failures, null, null); } public FieldCapabilitiesResponse(String[] indices, Map> fields) { - this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), Collections.emptyList(), null); + this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), Collections.emptyList(), null, null); } public static FieldCapabilitiesResponse empty() { @@ -83,12 +84,13 @@ public static FieldCapabilitiesResponse empty() { Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), + null, null ); } public FieldCapabilitiesResponse(List indexResponses, List failures) { - this(Strings.EMPTY_ARRAY, null, Collections.emptyMap(), Collections.emptyMap(), indexResponses, failures, null); + this(Strings.EMPTY_ARRAY, null, Collections.emptyMap(), Collections.emptyMap(), indexResponses, failures, null, null); } public static FieldCapabilitiesResponse.Builder builder() { @@ -102,7 +104,8 @@ private FieldCapabilitiesResponse( Map> fields, List indexResponses, List failures, - TransportVersion minTransportVersion + TransportVersion minTransportVersion, + @Nullable ResolvedIndexExpressions resolvedIndexExpressions ) { this.fields = Objects.requireNonNull(fields); this.resolvedLocally = resolvedLocally; @@ -111,6 +114,7 @@ private FieldCapabilitiesResponse( this.indices = indices; this.failures = failures; this.minTransportVersion = minTransportVersion; + this.resolvedIndexExpressions = resolvedIndexExpressions; } public FieldCapabilitiesResponse(StreamInput in) throws IOException { @@ -129,6 +133,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException { // when receiving a response we expect the resolved remotely to be empty. // It's only non-empty on the coordinating node if the FC requests targets remotes. this.resolvedRemotely = Collections.emptyMap(); + this.resolvedIndexExpressions = in.readOptionalWriteable(ResolvedIndexExpressions::new); } /** @@ -204,7 +209,16 @@ public TransportVersion minTransportVersion() { * Build a new response replacing the {@link #minTransportVersion()}. */ public FieldCapabilitiesResponse withMinTransportVersion(TransportVersion newMin) { - return new FieldCapabilitiesResponse(indices, resolvedLocally, resolvedRemotely, fields, indexResponses, failures, newMin); + return new FieldCapabilitiesResponse( + indices, + resolvedLocally, + resolvedRemotely, + fields, + indexResponses, + failures, + newMin, + resolvedIndexExpressions + ); } /** @@ -234,6 +248,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().supports(RESOLVED_FIELDS_CAPS)) { out.writeOptionalWriteable(resolvedLocally); } + out.writeOptionalWriteable(this.resolvedIndexExpressions); } private static void writeField(StreamOutput out, Map map) throws IOException { @@ -266,6 +281,11 @@ public Iterator toXContentChunked(ToXContent.Params params ); } + @Nullable + public ResolvedIndexExpressions getResolvedIndexExpressions() { + return resolvedIndexExpressions; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -300,6 +320,7 @@ public static class Builder { private List indexResponses = Collections.emptyList(); private List failures = Collections.emptyList(); private TransportVersion minTransportVersion = null; + private ResolvedIndexExpressions resolvedIndexExpressions = null; private Builder() {} @@ -346,6 +367,11 @@ public Builder withMinTransportVersion(TransportVersion minTransportVersion) { return this; } + public Builder withResolvedIndexExpressions(ResolvedIndexExpressions resolvedIndexExpressions) { + this.resolvedIndexExpressions = resolvedIndexExpressions; + return this; + } + public FieldCapabilitiesResponse build() { return new FieldCapabilitiesResponse( indices, @@ -354,7 +380,8 @@ public FieldCapabilitiesResponse build() { fields, indexResponses, failures, - minTransportVersion + minTransportVersion, + resolvedIndexExpressions ); } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 87f74c0704c53..cb4f3ad8ef30e 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -43,6 +43,8 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; @@ -55,6 +57,8 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -88,6 +92,7 @@ import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest.RESOLVED_FIELDS_CAPS; import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; +import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout; public class TransportFieldCapabilitiesAction extends HandledTransportAction { public static final String EXCLUSION = "-"; @@ -110,6 +115,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction void doExecuteForked( LinkedRequestExecutor linkedRequestExecutor, ActionListener listener ) { + final boolean crossProjectEnabled = crossProjectModeDecider.resolvesCrossProject(request); if (ccsCheckCompatibility) { checkCCSVersionCompatibility(request); } @@ -365,13 +373,67 @@ private void doExecuteForked( ); requestDispatcher.execute(); + /* + * We need to run the Cross Project Search reconciliation but only after we've heard back from all the linked projects. + * It is also possible that some linked projects may respond back with an error instead of a valid response. To facilitate + * this, we use `CountDown` and track each response, irrespective of whether it's valid or not, and then perform the + * reconciliation when it has counted down and the request is a resolvable CPS request. + */ + Runnable crossProjectReconciler; + Map linkedProjectsResponses = ConcurrentCollections.newConcurrentMap(); + // Run the reconciler only if there are linked projects and CPS is enabled. + if (remoteClusterIndices.isEmpty() == false && crossProjectEnabled) { + CountDown countDownResponses = new CountDown(remoteClusterIndices.size()); + crossProjectReconciler = () -> { + if (countDownResponses.countDown()) { + /* + * This happens when one or more linked projects respond with an error instead of a valid response -- say + * networking error. + */ + if (linkedProjectsResponses.size() != remoteClusterIndices.size()) { + listener.onFailure( + new IllegalArgumentException( + "Invalid number of responses received: " + + linkedProjectsResponses.size() + + " vs expected " + + remoteClusterIndices.size() + ) + ); + return; + } + + Exception validationEx = CrossProjectIndexResolutionValidator.validate( + request.indicesOptions(), + null, + request.getResolvedIndexExpressions(), + linkedProjectsResponses + ); + + if (validationEx != null) { + listener.onFailure(validationEx); + } + } + }; + } else { + crossProjectReconciler = () -> {}; + } + // this is the cross cluster part of this API - we force the other cluster to not merge the results but instead // send us back all individual index results. for (Map.Entry remoteIndices : remoteClusterIndices.entrySet()) { String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis); + FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest( + clusterAlias, + request, + originalIndices, + nowInMillis, + crossProjectModeDecider + ); ActionListener remoteListener = ActionListener.wrap(response -> { + assert response.getResolvedIndexExpressions() != null + : "Resolved index expressions from [" + clusterAlias + "] are null"; + linkedProjectsResponses.put(clusterAlias, response.getResolvedIndexExpressions()); if (request.includeResolvedTo() && response.getResolvedLocally() != null) { ResolvedIndexExpressions resolvedOnRemoteProject = response.getResolvedLocally(); @@ -426,6 +488,7 @@ private void doExecuteForked( } return TransportVersion.min(lhs, rhs); }); + crossProjectReconciler.run(); }, ex -> { for (String index : originalIndices.indices()) { handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); @@ -445,6 +508,7 @@ private void doExecuteForked( }); } } + crossProjectReconciler.run(); }); SubscribableListener connectionListener = new SubscribableListener<>(); @@ -573,6 +637,7 @@ private static void mergeIndexResponses( .withResolvedRemotelyBuilder(resolvedRemotely) .withMinTransportVersion(minTransportVersion.get()) .withFailures(failures) + .withResolvedIndexExpressions(request.getResolvedIndexExpressions()) .build() ); } @@ -607,13 +672,16 @@ private static FieldCapabilitiesRequest prepareRemoteRequest( String clusterAlias, FieldCapabilitiesRequest request, OriginalIndices originalIndices, - long nowInMillis + long nowInMillis, + CrossProjectModeDecider crossProjectModeDecider ) { FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); remoteRequest.clusterAlias(clusterAlias); remoteRequest.setMergeResults(false); // we need to merge on this node remoteRequest.indicesOptions(originalIndices.indicesOptions()); - remoteRequest.indices(originalIndices.indices()); + if (crossProjectModeDecider.resolvesCrossProject(request)) { + remoteRequest.indicesOptions(indicesOptionsForCrossProjectFanout(remoteRequest.indicesOptions())); + } remoteRequest.fields(request.fields()); remoteRequest.filters(request.filters()); remoteRequest.types(request.types()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/RestFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/rest/action/RestFieldCapabilitiesAction.java index 11b5c97aeb48c..faae60a638216 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/RestFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/RestFieldCapabilitiesAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -33,9 +34,11 @@ public class RestFieldCapabilitiesAction extends BaseRestHandler { private final Settings settings; + private final CrossProjectModeDecider crossProjectModeDecider; public RestFieldCapabilitiesAction(Settings settings) { this.settings = settings; + this.crossProjectModeDecider = new CrossProjectModeDecider(settings); } @Override @@ -55,7 +58,8 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) { + final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled(); + if (crossProjectModeDecider.crossProjectEnabled()) { // accept but drop project_routing param until fully supported request.param("project_routing"); } @@ -63,6 +67,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final FieldCapabilitiesRequest fieldRequest = new FieldCapabilitiesRequest(); fieldRequest.indices(indices); + if (crossProjectEnabled && fieldRequest.allowsCrossProject()) { + var cpsIdxOpts = IndicesOptions.builder(fieldRequest.indicesOptions()) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + fieldRequest.indicesOptions(cpsIdxOpts); + } fieldRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldRequest.indicesOptions())); fieldRequest.includeUnmapped(request.paramAsBoolean("include_unmapped", false));