Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137530.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137530
summary: Allows field caps to be cross project
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -51,6 +53,8 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private String[] types = Strings.EMPTY_ARRAY;
private boolean includeUnmapped = false;
private boolean includeEmptyFields = true;
@Nullable
private ResolvedIndexExpressions resolvedIndexExpressions = null;
/**
* Controls whether the field caps response should always include the list of indices
* where a field is defined. This flag is only used locally on the coordinating node,
Expand All @@ -60,6 +64,7 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private transient boolean includeIndices = false;

private boolean includeResolvedTo = false;
private String projectRouting;

/**
* Controls whether all local indices should be returned if no remotes matched
Expand Down Expand Up @@ -259,6 +264,31 @@ public boolean allowsRemoteIndices() {
return true;
}

@Override
public boolean allowsCrossProject() {
return true;
}

@Override
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
this.resolvedIndexExpressions = expressions;
}

@Override
@Nullable
public ResolvedIndexExpressions getResolvedIndexExpressions() {
return resolvedIndexExpressions;
}

public void projectRouting(String projectRouting) {
this.projectRouting = projectRouting;
}

@Override
public String getProjectRouting() {
return projectRouting;
}

@Override
public boolean includeDataStreams() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ public Builder withResolvedRemotelyBuilder(Map<String, ResolvedIndexExpressions.
return this;
}

public Builder withResolvedRemotely(Map<String, ResolvedIndexExpressions> resolvedRemotely) {
this.resolvedRemotely = resolvedRemotely;
return this;
}

public Builder withResolvedLocally(ResolvedIndexExpressions resolvedLocally) {
this.resolvedLocally = resolvedLocally;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -88,6 +90,7 @@

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest.RESOLVED_FIELDS_CAPS;
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout;

public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public static final String EXCLUSION = "-";
Expand All @@ -110,6 +113,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final boolean ccsCheckCompatibility;
private final ThreadPool threadPool;
private final TimeValue forceConnectTimeoutSecs;
private final CrossProjectModeDecider crossProjectModeDecider;

@Inject
public TransportFieldCapabilitiesAction(
Expand Down Expand Up @@ -138,6 +142,7 @@ public TransportFieldCapabilitiesAction(
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
this.threadPool = threadPool;
this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
this.crossProjectModeDecider = new CrossProjectModeDecider(clusterService.getSettings());
}

@Override
Expand Down Expand Up @@ -196,8 +201,14 @@ private <R extends ActionResponse> void doExecuteForked(
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ProjectState projectState = projectResolver.getProjectState(clusterService.state());
final var minTransportVersion = new AtomicReference<>(clusterService.state().getMinTransportVersion());
final IndicesOptions originalIndicesOptions = request.indicesOptions();
final boolean resolveCrossProject = crossProjectModeDecider.resolvesCrossProject(request);
final Map<String, OriginalIndices> remoteClusterIndices = transportService.getRemoteClusterService()
.groupIndices(request.indicesOptions(), request.indices(), request.returnLocalAll());
.groupIndices(
resolveCrossProject ? indicesOptionsForCrossProjectFanout(originalIndicesOptions) : originalIndicesOptions,
request.indices(),
request.returnLocalAll()
);
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

final String[] concreteLocalIndices;
Expand Down Expand Up @@ -270,9 +281,9 @@ private <R extends ActionResponse> void doExecuteForked(
indexResponses.clear();
indexMappingHashToResponses.clear();
};
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotely = new ConcurrentHashMap<>();
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotelyBuilder = new ConcurrentHashMap<>();
for (String clusterAlias : remoteClusterIndices.keySet()) {
resolvedRemotely.put(clusterAlias, ResolvedIndexExpressions.builder());
resolvedRemotelyBuilder.put(clusterAlias, ResolvedIndexExpressions.builder());
}
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
if (fieldCapTask.isCancelled()) {
Expand Down Expand Up @@ -335,12 +346,28 @@ private <R extends ActionResponse> void doExecuteForked(
if (fieldCapTask.notifyIfCancelled(listener)) {
releaseResourcesOnCancel.run();
} else {
Map<String, ResolvedIndexExpressions> resolvedRemotely = resolvedRemotelyBuilder.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
ResolvedIndexExpressions resolvedLocally = new ResolvedIndexExpressions(resolvedLocallyList);
if (resolveCrossProject) {
final Exception ex = CrossProjectIndexResolutionValidator.validate(
request.indicesOptions(),
request.getProjectRouting(),
resolvedLocally,
resolvedRemotely
);
if (ex != null) {
listener.onFailure(ex);
return;
}
}
mergeIndexResponses(
request,
fieldCapTask,
indexResponses,
indexFailures,
resolvedLocallyList,
resolvedLocally,
resolvedRemotely,
minTransportVersion,
listener.map(linkedRequestExecutor::wrapPrimary)
Expand Down Expand Up @@ -370,15 +397,21 @@ private <R extends ActionResponse> void doExecuteForked(
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(
clusterAlias,
request,
originalIndices,
nowInMillis,
resolveCrossProject
);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {

if (request.includeResolvedTo() && response.getResolvedLocally() != null) {
ResolvedIndexExpressions resolvedOnRemoteProject = response.getResolvedLocally();
// for bwc we need to check that resolvedOnRemoteProject Exists in the response
if (resolvedOnRemoteProject != null) {
for (ResolvedIndexExpression remoteResolvedExpression : resolvedOnRemoteProject.expressions()) {
resolvedRemotely.computeIfPresent(clusterAlias, (k, v) -> {
resolvedRemotelyBuilder.computeIfPresent(clusterAlias, (k, v) -> {
v.addExpression(remoteResolvedExpression);
return v;
});
Expand Down Expand Up @@ -413,7 +446,7 @@ private <R extends ActionResponse> void doExecuteForked(
),
Set.of()
);
resolvedRemotely.computeIfPresent(clusterAlias, (k, v) -> {
resolvedRemotelyBuilder.computeIfPresent(clusterAlias, (k, v) -> {
v.addExpression(err);
return v;
});
Expand All @@ -439,7 +472,7 @@ private <R extends ActionResponse> void doExecuteForked(
),
Set.of()
);
resolvedRemotely.computeIfPresent(clusterAlias, (k, v) -> {
resolvedRemotelyBuilder.computeIfPresent(clusterAlias, (k, v) -> {
v.addExpression(err);
return v;
});
Expand Down Expand Up @@ -552,12 +585,11 @@ private static void mergeIndexResponses(
CancellableTask task,
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
FailureCollector indexFailures,
List<ResolvedIndexExpression> resolvedLocallyList,
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotely,
ResolvedIndexExpressions resolvedLocally,
Map<String, ResolvedIndexExpressions> resolvedRemotely,
AtomicReference<TransportVersion> minTransportVersion,
ActionListener<FieldCapabilitiesResponse> listener
) {
ResolvedIndexExpressions resolvedLocally = new ResolvedIndexExpressions(resolvedLocallyList);
List<FieldCapabilitiesFailure> failures = indexFailures.build(indexResponses.keySet());
if (indexResponses.isEmpty() == false) {
if (request.isMergeResults()) {
Expand All @@ -570,7 +602,7 @@ private static void mergeIndexResponses(
FieldCapabilitiesResponse.builder()
.withIndexResponses(new ArrayList<>(indexResponses.values()))
.withResolvedLocally(resolvedLocally)
.withResolvedRemotelyBuilder(resolvedRemotely)
.withResolvedRemotely(resolvedRemotely)
.withMinTransportVersion(minTransportVersion.get())
.withFailures(failures)
.build()
Expand All @@ -590,7 +622,7 @@ private static void mergeIndexResponses(
FieldCapabilitiesResponse.builder()
.withFailures(failures)
.withResolvedLocally(resolvedLocally)
.withResolvedRemotelyBuilder(resolvedRemotely)
.withResolvedRemotely(resolvedRemotely)
.withMinTransportVersion(minTransportVersion.get())
.build()
);
Expand All @@ -607,12 +639,18 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
String clusterAlias,
FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
long nowInMillis
long nowInMillis,
boolean resolveCrossProject
) {
IndicesOptions indicesOptions = originalIndices.indicesOptions();
if (indicesOptions.resolveCrossProjectIndexExpression()) {
// if is a CPS request reset CrossProjectModeOptions to Default and use lenient IndicesOptions.
indicesOptions = indicesOptionsForCrossProjectFanout(indicesOptions);
}
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.clusterAlias(clusterAlias);
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indicesOptions(indicesOptions);
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteRequest.filters(request.filters());
Expand All @@ -621,7 +659,7 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
remoteRequest.indexFilter(request.indexFilter());
remoteRequest.nowInMillis(nowInMillis);
remoteRequest.includeEmptyFields(request.includeEmptyFields());
remoteRequest.includeResolvedTo(request.includeResolvedTo());
remoteRequest.includeResolvedTo(request.includeResolvedTo() || resolveCrossProject);
return remoteRequest;
}

Expand All @@ -634,7 +672,7 @@ private static boolean hasSameMappingHash(FieldCapabilitiesIndexResponse r1, Fie
private static FieldCapabilitiesResponse merge(
Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap,
ResolvedIndexExpressions resolvedLocally,
Map<String, ResolvedIndexExpressions.Builder> resolvedRemotely,
Map<String, ResolvedIndexExpressions> resolvedRemotely,
CancellableTask task,
FieldCapabilitiesRequest request,
List<FieldCapabilitiesFailure> failures,
Expand Down Expand Up @@ -703,7 +741,7 @@ private static FieldCapabilitiesResponse merge(
.withMinTransportVersion(minTransportVersion.get());
if (request.includeResolvedTo() && minTransportVersion.get().supports(RESOLVED_FIELDS_CAPS)) {
// add resolution to response iff includeResolvedTo and all the nodes in the cluster supports it
responseBuilder.withResolvedLocally(new ResolvedIndexExpressions(collect)).withResolvedRemotelyBuilder(resolvedRemotely);
responseBuilder.withResolvedLocally(new ResolvedIndexExpressions(collect)).withResolvedRemotely(resolvedRemotely);
}
return responseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;

Expand All @@ -33,9 +34,11 @@
public class RestFieldCapabilitiesAction extends BaseRestHandler {

private final Settings settings;
private final CrossProjectModeDecider crossProjectModeDecider;

public RestFieldCapabilitiesAction(Settings settings) {
this.settings = settings;
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
}

@Override
Expand All @@ -55,15 +58,25 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
final FieldCapabilitiesRequest fieldRequest = new FieldCapabilitiesRequest();

final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectModeDecider.crossProjectEnabled()) {
// accept project_routing param and set includeResolved to true
fieldRequest.projectRouting(request.param("project_routing", null));
fieldRequest.includeResolvedTo(true);
}

final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final FieldCapabilitiesRequest fieldRequest = new FieldCapabilitiesRequest();
fieldRequest.indices(indices);

if (crossProjectEnabled && fieldRequest.allowsCrossProject()) {
var cpsIdxOpts = IndicesOptions.builder(fieldRequest.indicesOptions())
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();
fieldRequest.indicesOptions(cpsIdxOpts);
}

fieldRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldRequest.indicesOptions()));
fieldRequest.includeUnmapped(request.paramAsBoolean("include_unmapped", false));
fieldRequest.includeEmptyFields(request.paramAsBoolean("include_empty_fields", true));
Expand Down