Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private QueryBuilder indexFilter;
private Map<String, Object> runtimeFields = Collections.emptyMap();
private Long nowInMillis;
private ResolvedIndexExpressions resolvedIndexExpressions;

public FieldCapabilitiesRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -259,6 +261,21 @@ public boolean allowsRemoteIndices() {
return true;
}

@Override
public boolean allowsCrossProject() {
return true;
}

@Override
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
this.resolvedIndexExpressions = expressions;
}

@Override
public ResolvedIndexExpressions getResolvedIndexExpressions() {
return resolvedIndexExpressions;
}

@Override
public boolean includeDataStreams() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked
private final List<FieldCapabilitiesFailure> failures;
private final List<FieldCapabilitiesIndexResponse> indexResponses;
private final TransportVersion minTransportVersion;
private final ResolvedIndexExpressions resolvedIndexExpressions;

public FieldCapabilitiesResponse(
String[] indices,
Map<String, Map<String, FieldCapabilities>> fields,
List<FieldCapabilitiesFailure> failures
) {
this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), failures, null);
this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), failures, null, null);
}

public FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, FieldCapabilities>> fields) {
this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), Collections.emptyList(), null);
this(indices, null, Collections.emptyMap(), fields, Collections.emptyList(), Collections.emptyList(), null, null);
}

public static FieldCapabilitiesResponse empty() {
Expand All @@ -83,12 +84,13 @@ public static FieldCapabilitiesResponse empty() {
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
null,
null
);
}

public FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
this(Strings.EMPTY_ARRAY, null, Collections.emptyMap(), Collections.emptyMap(), indexResponses, failures, null);
this(Strings.EMPTY_ARRAY, null, Collections.emptyMap(), Collections.emptyMap(), indexResponses, failures, null, null);
}

public static FieldCapabilitiesResponse.Builder builder() {
Expand All @@ -102,7 +104,8 @@ private FieldCapabilitiesResponse(
Map<String, Map<String, FieldCapabilities>> fields,
List<FieldCapabilitiesIndexResponse> indexResponses,
List<FieldCapabilitiesFailure> failures,
TransportVersion minTransportVersion
TransportVersion minTransportVersion,
@Nullable ResolvedIndexExpressions resolvedIndexExpressions
) {
this.fields = Objects.requireNonNull(fields);
this.resolvedLocally = resolvedLocally;
Expand All @@ -111,6 +114,7 @@ private FieldCapabilitiesResponse(
this.indices = indices;
this.failures = failures;
this.minTransportVersion = minTransportVersion;
this.resolvedIndexExpressions = resolvedIndexExpressions;
}

public FieldCapabilitiesResponse(StreamInput in) throws IOException {
Expand All @@ -129,6 +133,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException {
// when receiving a response we expect the resolved remotely to be empty.
// It's only non-empty on the coordinating node if the FC requests targets remotes.
this.resolvedRemotely = Collections.emptyMap();
this.resolvedIndexExpressions = in.readOptionalWriteable(ResolvedIndexExpressions::new);
}

/**
Expand Down Expand Up @@ -204,7 +209,16 @@ public TransportVersion minTransportVersion() {
* Build a new response replacing the {@link #minTransportVersion()}.
*/
public FieldCapabilitiesResponse withMinTransportVersion(TransportVersion newMin) {
return new FieldCapabilitiesResponse(indices, resolvedLocally, resolvedRemotely, fields, indexResponses, failures, newMin);
return new FieldCapabilitiesResponse(
indices,
resolvedLocally,
resolvedRemotely,
fields,
indexResponses,
failures,
newMin,
resolvedIndexExpressions
);
}

/**
Expand Down Expand Up @@ -234,6 +248,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(RESOLVED_FIELDS_CAPS)) {
out.writeOptionalWriteable(resolvedLocally);
}
out.writeOptionalWriteable(this.resolvedIndexExpressions);
}

private static void writeField(StreamOutput out, Map<String, FieldCapabilities> map) throws IOException {
Expand Down Expand Up @@ -266,6 +281,11 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
);
}

@Nullable
public ResolvedIndexExpressions getResolvedIndexExpressions() {
return resolvedIndexExpressions;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -300,6 +320,7 @@ public static class Builder {
private List<FieldCapabilitiesIndexResponse> indexResponses = Collections.emptyList();
private List<FieldCapabilitiesFailure> failures = Collections.emptyList();
private TransportVersion minTransportVersion = null;
private ResolvedIndexExpressions resolvedIndexExpressions = null;

private Builder() {}

Expand Down Expand Up @@ -346,6 +367,11 @@ public Builder withMinTransportVersion(TransportVersion minTransportVersion) {
return this;
}

public Builder withResolvedIndexExpressions(ResolvedIndexExpressions resolvedIndexExpressions) {
this.resolvedIndexExpressions = resolvedIndexExpressions;
return this;
}

public FieldCapabilitiesResponse build() {
return new FieldCapabilitiesResponse(
indices,
Expand All @@ -354,7 +380,8 @@ public FieldCapabilitiesResponse build() {
fields,
indexResponses,
failures,
minTransportVersion
minTransportVersion,
resolvedIndexExpressions
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Nullable;
Expand All @@ -55,6 +57,8 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -88,6 +92,7 @@

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest.RESOLVED_FIELDS_CAPS;
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout;

public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public static final String EXCLUSION = "-";
Expand All @@ -110,6 +115,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final boolean ccsCheckCompatibility;
private final ThreadPool threadPool;
private final TimeValue forceConnectTimeoutSecs;
private final CrossProjectModeDecider crossProjectModeDecider;

@Inject
public TransportFieldCapabilitiesAction(
Expand Down Expand Up @@ -138,6 +144,7 @@ public TransportFieldCapabilitiesAction(
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
this.threadPool = threadPool;
this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
this.crossProjectModeDecider = new CrossProjectModeDecider(clusterService.getSettings());
}

@Override
Expand Down Expand Up @@ -186,6 +193,7 @@ private <R extends ActionResponse> void doExecuteForked(
LinkedRequestExecutor<R> linkedRequestExecutor,
ActionListener<R> listener
) {
final boolean crossProjectEnabled = crossProjectModeDecider.resolvesCrossProject(request);
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(request);
}
Expand Down Expand Up @@ -365,13 +373,67 @@ private <R extends ActionResponse> void doExecuteForked(
);
requestDispatcher.execute();

/*
* We need to run the Cross Project Search reconciliation but only after we've heard back from all the linked projects.
* It is also possible that some linked projects may respond back with an error instead of a valid response. To facilitate
* this, we use `CountDown` and track each response, irrespective of whether it's valid or not, and then perform the
* reconciliation when it has counted down and the request is a resolvable CPS request.
*/
Runnable crossProjectReconciler;
Map<String, ResolvedIndexExpressions> linkedProjectsResponses = ConcurrentCollections.newConcurrentMap();
// Run the reconciler only if there are linked projects and CPS is enabled.
if (remoteClusterIndices.isEmpty() == false && crossProjectEnabled) {
CountDown countDownResponses = new CountDown(remoteClusterIndices.size());
crossProjectReconciler = () -> {
if (countDownResponses.countDown()) {
/*
* This happens when one or more linked projects respond with an error instead of a valid response -- say
* networking error.
*/
if (linkedProjectsResponses.size() != remoteClusterIndices.size()) {
listener.onFailure(
new IllegalArgumentException(
"Invalid number of responses received: "
+ linkedProjectsResponses.size()
+ " vs expected "
+ remoteClusterIndices.size()
)
);
return;
}

Exception validationEx = CrossProjectIndexResolutionValidator.validate(
request.indicesOptions(),
null,
request.getResolvedIndexExpressions(),
linkedProjectsResponses
);

if (validationEx != null) {
listener.onFailure(validationEx);
}
}
};
} else {
crossProjectReconciler = () -> {};
}

// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
// send us back all individual index results.
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(
clusterAlias,
request,
originalIndices,
nowInMillis,
crossProjectModeDecider
);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
assert response.getResolvedIndexExpressions() != null
: "Resolved index expressions from [" + clusterAlias + "] are null";
linkedProjectsResponses.put(clusterAlias, response.getResolvedIndexExpressions());

if (request.includeResolvedTo() && response.getResolvedLocally() != null) {
ResolvedIndexExpressions resolvedOnRemoteProject = response.getResolvedLocally();
Expand Down Expand Up @@ -426,6 +488,7 @@ private <R extends ActionResponse> void doExecuteForked(
}
return TransportVersion.min(lhs, rhs);
});
crossProjectReconciler.run();
}, ex -> {
for (String index : originalIndices.indices()) {
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
Expand All @@ -445,6 +508,7 @@ private <R extends ActionResponse> void doExecuteForked(
});
}
}
crossProjectReconciler.run();
});

SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();
Expand Down Expand Up @@ -573,6 +637,7 @@ private static void mergeIndexResponses(
.withResolvedRemotelyBuilder(resolvedRemotely)
.withMinTransportVersion(minTransportVersion.get())
.withFailures(failures)
.withResolvedIndexExpressions(request.getResolvedIndexExpressions())
.build()
);
}
Expand Down Expand Up @@ -607,13 +672,16 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
String clusterAlias,
FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
long nowInMillis
long nowInMillis,
CrossProjectModeDecider crossProjectModeDecider
) {
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.clusterAlias(clusterAlias);
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
if (crossProjectModeDecider.resolvesCrossProject(request)) {
remoteRequest.indicesOptions(indicesOptionsForCrossProjectFanout(remoteRequest.indicesOptions()));
}
remoteRequest.fields(request.fields());
remoteRequest.filters(request.filters());
remoteRequest.types(request.types());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;

Expand All @@ -33,9 +34,11 @@
public class RestFieldCapabilitiesAction extends BaseRestHandler {

private final Settings settings;
private final CrossProjectModeDecider crossProjectModeDecider;

public RestFieldCapabilitiesAction(Settings settings) {
this.settings = settings;
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
}

@Override
Expand All @@ -55,14 +58,21 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectModeDecider.crossProjectEnabled()) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
}

final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final FieldCapabilitiesRequest fieldRequest = new FieldCapabilitiesRequest();
fieldRequest.indices(indices);
if (crossProjectEnabled && fieldRequest.allowsCrossProject()) {
var cpsIdxOpts = IndicesOptions.builder(fieldRequest.indicesOptions())
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();
fieldRequest.indicesOptions(cpsIdxOpts);
}

fieldRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldRequest.indicesOptions()));
fieldRequest.includeUnmapped(request.paramAsBoolean("include_unmapped", false));
Expand Down