Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137530.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137530
summary: Allows field caps to be cross project
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -51,6 +53,8 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private String[] types = Strings.EMPTY_ARRAY;
private boolean includeUnmapped = false;
private boolean includeEmptyFields = true;
@Nullable
private ResolvedIndexExpressions resolvedIndexExpressions = null;
/**
* Controls whether the field caps response should always include the list of indices
* where a field is defined. This flag is only used locally on the coordinating node,
Expand All @@ -60,6 +64,7 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private transient boolean includeIndices = false;

private boolean includeResolvedTo = false;
private String projectRouting;

/**
* Controls whether all local indices should be returned if no remotes matched
Expand Down Expand Up @@ -259,6 +264,31 @@ public boolean allowsRemoteIndices() {
return true;
}

@Override
public boolean allowsCrossProject() {
return true;
}

@Override
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
this.resolvedIndexExpressions = expressions;
}

@Override
@Nullable
public ResolvedIndexExpressions getResolvedIndexExpressions() {
return resolvedIndexExpressions;
}

public void projectRouting(String projectRouting) {
this.projectRouting = projectRouting;
}

@Override
public String getProjectRouting() {
return projectRouting;
}

@Override
public boolean includeDataStreams() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ public Builder withResolvedRemotelyBuilder(Map<String, ResolvedIndexExpressions.
return this;
}

public Builder withResolvedRemotely(Map<String, ResolvedIndexExpressions> resolvedRemotely) {
this.resolvedRemotely = resolvedRemotely;
return this;
}

public Builder withResolvedLocally(ResolvedIndexExpressions resolvedLocally) {
this.resolvedLocally = resolvedLocally;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -88,6 +90,7 @@

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest.RESOLVED_FIELDS_CAPS;
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout;

public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public static final String EXCLUSION = "-";
Expand All @@ -110,6 +113,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final boolean ccsCheckCompatibility;
private final ThreadPool threadPool;
private final TimeValue forceConnectTimeoutSecs;
private final CrossProjectModeDecider crossProjectModeDecider;

@Inject
public TransportFieldCapabilitiesAction(
Expand Down Expand Up @@ -138,6 +142,7 @@ public TransportFieldCapabilitiesAction(
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
this.threadPool = threadPool;
this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
this.crossProjectModeDecider = new CrossProjectModeDecider(clusterService.getSettings());
}

@Override
Expand Down Expand Up @@ -196,8 +201,14 @@ private <R extends ActionResponse> void doExecuteForked(
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ProjectState projectState = projectResolver.getProjectState(clusterService.state());
final var minTransportVersion = new AtomicReference<>(clusterService.state().getMinTransportVersion());
final IndicesOptions originalIndicesOptions = request.indicesOptions();
final boolean resolveCrossProject = crossProjectModeDecider.resolvesCrossProject(request);
final Map<String, OriginalIndices> remoteClusterIndices = transportService.getRemoteClusterService()
.groupIndices(request.indicesOptions(), request.indices(), request.returnLocalAll());
.groupIndices(
resolveCrossProject ? indicesOptionsForCrossProjectFanout(originalIndicesOptions) : originalIndicesOptions,
request.indices(),
request.returnLocalAll()
);
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

final String[] concreteLocalIndices;
Expand Down Expand Up @@ -270,9 +281,9 @@ private <R extends ActionResponse> void doExecuteForked(
indexResponses.clear();
indexMappingHashToResponses.clear();
};
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotely = new ConcurrentHashMap<>();
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotelyBuilder = new ConcurrentHashMap<>();
for (String clusterAlias : remoteClusterIndices.keySet()) {
resolvedRemotely.put(clusterAlias, ResolvedIndexExpressions.builder());
resolvedRemotelyBuilder.put(clusterAlias, ResolvedIndexExpressions.builder());
}
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
if (fieldCapTask.isCancelled()) {
Expand Down Expand Up @@ -335,12 +346,28 @@ private <R extends ActionResponse> void doExecuteForked(
if (fieldCapTask.notifyIfCancelled(listener)) {
releaseResourcesOnCancel.run();
} else {
Map<String, ResolvedIndexExpressions> resolvedRemotely = resolvedRemotelyBuilder.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
ResolvedIndexExpressions resolvedLocally = new ResolvedIndexExpressions(resolvedLocallyList);
if (resolveCrossProject) {
final Exception ex = CrossProjectIndexResolutionValidator.validate(
request.indicesOptions(),
request.getProjectRouting(),
resolvedLocally,
resolvedRemotely
);
if (ex != null) {
listener.onFailure(ex);
return;
}
}
mergeIndexResponses(
request,
fieldCapTask,
indexResponses,
indexFailures,
resolvedLocallyList,
resolvedLocally,
resolvedRemotely,
minTransportVersion,
listener.map(linkedRequestExecutor::wrapPrimary)
Expand Down Expand Up @@ -370,15 +397,21 @@ private <R extends ActionResponse> void doExecuteForked(
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(
clusterAlias,
request,
originalIndices,
nowInMillis,
resolveCrossProject
);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {

if (request.includeResolvedTo() && response.getResolvedLocally() != null) {
ResolvedIndexExpressions resolvedOnRemoteProject = response.getResolvedLocally();
// for bwc we need to check that resolvedOnRemoteProject Exists in the response
if (resolvedOnRemoteProject != null) {
for (ResolvedIndexExpression remoteResolvedExpression : resolvedOnRemoteProject.expressions()) {
resolvedRemotely.computeIfPresent(clusterAlias, (k, v) -> {
resolvedRemotelyBuilder.computeIfPresent(clusterAlias, (k, v) -> {
v.addExpression(remoteResolvedExpression);
return v;
});
Expand Down Expand Up @@ -413,7 +446,7 @@ private <R extends ActionResponse> void doExecuteForked(
),
Set.of()
);
resolvedRemotely.computeIfPresent(clusterAlias, (k, v) -> {
resolvedRemotelyBuilder.computeIfPresent(clusterAlias, (k, v) -> {
v.addExpression(err);
return v;
});
Expand All @@ -439,7 +472,7 @@ private <R extends ActionResponse> void doExecuteForked(
),
Set.of()
);
resolvedRemotely.computeIfPresent(clusterAlias, (k, v) -> {
resolvedRemotelyBuilder.computeIfPresent(clusterAlias, (k, v) -> {
v.addExpression(err);
return v;
});
Expand Down Expand Up @@ -552,12 +585,11 @@ private static void mergeIndexResponses(
CancellableTask task,
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
FailureCollector indexFailures,
List<ResolvedIndexExpression> resolvedLocallyList,
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotely,
ResolvedIndexExpressions resolvedLocally,
Map<String, ResolvedIndexExpressions> resolvedRemotely,
AtomicReference<TransportVersion> minTransportVersion,
ActionListener<FieldCapabilitiesResponse> listener
) {
ResolvedIndexExpressions resolvedLocally = new ResolvedIndexExpressions(resolvedLocallyList);
List<FieldCapabilitiesFailure> failures = indexFailures.build(indexResponses.keySet());
if (indexResponses.isEmpty() == false) {
if (request.isMergeResults()) {
Expand All @@ -570,7 +602,7 @@ private static void mergeIndexResponses(
FieldCapabilitiesResponse.builder()
.withIndexResponses(new ArrayList<>(indexResponses.values()))
.withResolvedLocally(resolvedLocally)
.withResolvedRemotelyBuilder(resolvedRemotely)
.withResolvedRemotely(resolvedRemotely)
.withMinTransportVersion(minTransportVersion.get())
.withFailures(failures)
.build()
Expand All @@ -590,7 +622,7 @@ private static void mergeIndexResponses(
FieldCapabilitiesResponse.builder()
.withFailures(failures)
.withResolvedLocally(resolvedLocally)
.withResolvedRemotelyBuilder(resolvedRemotely)
.withResolvedRemotely(resolvedRemotely)
.withMinTransportVersion(minTransportVersion.get())
.build()
);
Expand All @@ -607,12 +639,18 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
String clusterAlias,
FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
long nowInMillis
long nowInMillis,
boolean resolveCrossProject
) {
IndicesOptions indicesOptions = originalIndices.indicesOptions();
if (indicesOptions.resolveCrossProjectIndexExpression()) {
// if is a CPS request reset CrossProjectModeOptions to Default and use lenient IndicesOptions.
indicesOptions = indicesOptionsForCrossProjectFanout(indicesOptions);
}
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.clusterAlias(clusterAlias);
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indicesOptions(indicesOptions);
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteRequest.filters(request.filters());
Expand All @@ -621,7 +659,7 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
remoteRequest.indexFilter(request.indexFilter());
remoteRequest.nowInMillis(nowInMillis);
remoteRequest.includeEmptyFields(request.includeEmptyFields());
remoteRequest.includeResolvedTo(request.includeResolvedTo());
remoteRequest.includeResolvedTo(request.includeResolvedTo() || resolveCrossProject);
return remoteRequest;
}

Expand All @@ -634,7 +672,7 @@ private static boolean hasSameMappingHash(FieldCapabilitiesIndexResponse r1, Fie
private static FieldCapabilitiesResponse merge(
Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap,
ResolvedIndexExpressions resolvedLocally,
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotely,
Map<String, ResolvedIndexExpressions> resolvedRemotely,
CancellableTask task,
FieldCapabilitiesRequest request,
List<FieldCapabilitiesFailure> failures,
Expand Down Expand Up @@ -703,7 +741,7 @@ private static FieldCapabilitiesResponse merge(
.withMinTransportVersion(minTransportVersion.get());
if (request.includeResolvedTo() && minTransportVersion.get().supports(RESOLVED_FIELDS_CAPS)) {
// add resolution to response iff includeResolvedTo and all the nodes in the cluster supports it
responseBuilder.withResolvedLocally(new ResolvedIndexExpressions(collect)).withResolvedRemotelyBuilder(resolvedRemotely);
responseBuilder.withResolvedLocally(new ResolvedIndexExpressions(collect)).withResolvedRemotely(resolvedRemotely);
}
return responseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;

Expand All @@ -33,9 +34,11 @@
public class RestFieldCapabilitiesAction extends BaseRestHandler {

private final Settings settings;
private final CrossProjectModeDecider crossProjectModeDecider;

public RestFieldCapabilitiesAction(Settings settings) {
this.settings = settings;
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
}

@Override
Expand All @@ -55,15 +58,25 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
final FieldCapabilitiesRequest fieldRequest = new FieldCapabilitiesRequest();

final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectModeDecider.crossProjectEnabled()) {
// accept project_routing param and set includeResolved to true
fieldRequest.projectRouting(request.param("project_routing", null));
fieldRequest.includeResolvedTo(true);
}

final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final FieldCapabilitiesRequest fieldRequest = new FieldCapabilitiesRequest();
fieldRequest.indices(indices);

if (crossProjectEnabled && fieldRequest.allowsCrossProject()) {
var cpsIdxOpts = IndicesOptions.builder(fieldRequest.indicesOptions())
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();
fieldRequest.indicesOptions(cpsIdxOpts);
}

fieldRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldRequest.indicesOptions()));
fieldRequest.includeUnmapped(request.paramAsBoolean("include_unmapped", false));
fieldRequest.includeEmptyFields(request.paramAsBoolean("include_empty_fields", true));
Expand Down