Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class SourceDestValidator {
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
+ "alias [{0}], license is not active";
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
public static final String CROSS_PROJECT_INDICES_NOT_SUPPORTED = "cross-project indices are not supported";
public static final String REMOTE_CLUSTERS_TRANSPORT_TOO_OLD =
"remote clusters are expected to run at least version [{0}] (reason: [{1}])," + " but the following clusters were too old: [{2}]";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
Expand All @@ -78,6 +79,7 @@ public final class SourceDestValidator {
private final RemoteClusterService remoteClusterService;
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
private final IngestService ingestService;
private final boolean crossProjectEnabled;
private final String nodeName;
private final String license;

Expand All @@ -90,6 +92,7 @@ static class Context {
private final RemoteClusterService remoteClusterService;
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
private final IngestService ingestService;
private final boolean crossProjectEnabled;
private final String[] source;
private final String destIndex;
private final String destPipeline;
Expand All @@ -107,6 +110,7 @@ static class Context {
final RemoteClusterService remoteClusterService,
final RemoteClusterLicenseChecker remoteClusterLicenseChecker,
final IngestService ingestService,
boolean crossProjectEnabled,
final String[] source,
final String destIndex,
final String destPipeline,
Expand All @@ -118,6 +122,7 @@ static class Context {
this.remoteClusterService = remoteClusterService;
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
this.ingestService = ingestService;
this.crossProjectEnabled = crossProjectEnabled;
this.source = source;
this.destIndex = destIndex;
this.destPipeline = destPipeline;
Expand Down Expand Up @@ -165,6 +170,10 @@ public String getLicense() {
return license;
}

private boolean crossProjectEnabled() {
return crossProjectEnabled;
}

public SortedSet<String> resolveSource() {
if (resolvedSource == null) {
resolveLocalAndRemoteSource();
Expand Down Expand Up @@ -266,23 +275,26 @@ public interface SourceDestValidation {
* Create a new Source Dest Validator
*
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
* @param remoteClusterService A valid RemoteClusterService object
* @param remoteClusterService A valid RemoteClusterService object
* @param remoteClusterLicenseChecker A RemoteClusterLicenseChecker or null if CCS is disabled
* @param nodeName the name of this node
* @param license the license of the feature validated for
* @param crossProjectEnabled Determines if cross-project is enabled for this cluster
* @param nodeName the name of this node
* @param license the license of the feature validated for
*/
public SourceDestValidator(
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteClusterService remoteClusterService,
RemoteClusterLicenseChecker remoteClusterLicenseChecker,
IngestService ingestService,
boolean crossProjectEnabled,
String nodeName,
String license
) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.remoteClusterService = remoteClusterService;
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
this.ingestService = ingestService;
this.crossProjectEnabled = crossProjectEnabled;
this.nodeName = nodeName;
this.license = license;
}
Expand Down Expand Up @@ -311,6 +323,7 @@ public void validate(
remoteClusterService,
remoteClusterLicenseChecker,
ingestService,
crossProjectEnabled,
source,
destIndex,
destPipeline,
Expand Down Expand Up @@ -555,7 +568,9 @@ static class RemoteSourceNotSupportedValidation implements SourceDestValidation
@Override
public void validate(Context context, ActionListener<Context> listener) {
if (context.resolveRemoteSource().isEmpty() == false) {
context.addValidationError(REMOTE_SOURCE_INDICES_NOT_SUPPORTED);
context.addValidationError(
context.crossProjectEnabled() ? CROSS_PROJECT_INDICES_NOT_SUPPORTED : REMOTE_SOURCE_INDICES_NOT_SUPPORTED
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main bit of changed logic - if we're in an environment that supports cross-project, then we are also in an environment that does not support cross-cluster, so we display the appropriate error message

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prwhelan do we really need to have two separate validation failures? Why not just one that says "remote source and cross-project indices are not supported". Seems like an unnecessary complication.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with that, though this code will come back for the transform check

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prwhelan right, transforms only support remote indices, not cps. I think thats fine.

}
listener.onResponse(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RemoteClusterMinimumVersionValidationTests extends ESTestCase {

@Before
public void setUpMocks() {
context = spy(new Context(null, null, null, null, null, null, null, null, null, null));
context = spy(new Context(null, null, null, null, null, false, null, null, null, null, null));
doReturn(TransportVersions.V_8_10_X).when(context).getRemoteClusterVersion("cluster-A");
doReturn(TransportVersions.V_8_11_X).when(context).getRemoteClusterVersion("cluster-B");
doReturn(TransportVersions.V_8_12_0).when(context).getRemoteClusterVersion("cluster-C");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -123,6 +125,7 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
null,
ingestService,
false,
"node_id",
"license"
);
Expand Down Expand Up @@ -649,6 +652,7 @@ public void testRemoteSourceBasic() throws InterruptedException {
remoteClusterService,
remoteClusterLicenseCheckerBasic,
ingestService,
false,
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
"dest",
null,
Expand All @@ -667,6 +671,72 @@ public void testRemoteSourceBasic() throws InterruptedException {
);
}

public void testRemoteSourceNotSupportedValidationWithLocalIndex() throws InterruptedException {
Context context = spy(
new SourceDestValidator.Context(
CLUSTER_STATE,
indexNameExpressionResolver,
remoteClusterService,
remoteClusterLicenseCheckerBasic,
ingestService,
false,
new String[] { SOURCE_1 },
"dest",
null,
"node_id",
"license"
)
);

assertValidationWithContext(listener -> REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION.validate(context, listener), c -> {
assertNull(c.getValidationException());
}, null);
}

public void testRemoteSourceNotSupportedValidationWithRemoteIndex() throws InterruptedException {
Context context = spy(
new SourceDestValidator.Context(
CLUSTER_STATE,
indexNameExpressionResolver,
remoteClusterService,
remoteClusterLicenseCheckerBasic,
ingestService,
false,
new String[] { REMOTE_BASIC + ":" + SOURCE_1 },
"dest",
null,
"node_id",
"license"
)
);

assertValidationWithContext(listener -> REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION.validate(context, listener), c -> {
assertThat(c.getValidationException().getMessage(), containsString("remote source indices are not supported"));
}, null);
}

public void testRemoteSourceNotSupportedValidationWithCrossProjectIndex() throws InterruptedException {
Context context = spy(
new SourceDestValidator.Context(
CLUSTER_STATE,
indexNameExpressionResolver,
remoteClusterService,
remoteClusterLicenseCheckerBasic,
ingestService,
true,
new String[] { "project1:" + SOURCE_1 },
"dest",
null,
"node_id",
"license"
)
);

assertValidationWithContext(listener -> REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION.validate(context, listener), c -> {
assertThat(c.getValidationException().getMessage(), containsString("cross-project indices are not supported"));
}, null);
}

public void testRemoteSourcePlatinum() throws InterruptedException {
final Context context = spy(
new SourceDestValidator.Context(
Expand All @@ -675,6 +745,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithBasicLicense, platinumFeature),
ingestService,
false,
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
"dest",
null,
Expand Down Expand Up @@ -706,6 +777,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithPlatinumLicense, platinumFeature),
ingestService,
false,
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
"dest",
null,
Expand All @@ -728,6 +800,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithPlatinumLicense, platinumFeature),
ingestService,
false,
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
"dest",
"node_id",
Expand All @@ -751,6 +824,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithTrialLicense, platinumFeature),
ingestService,
false,
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
"dest",
"node_id",
Expand All @@ -776,6 +850,7 @@ public void testRemoteSourceLicenseInActive() throws InterruptedException {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, platinumFeature),
ingestService,
false,
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
"dest",
null,
Expand Down Expand Up @@ -804,6 +879,7 @@ public void testRemoteSourceDoesNotExist() throws InterruptedException {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, platinumFeature),
ingestService,
false,
new String[] { "non_existing_remote:" + "SOURCE_1" },
"dest",
null,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.containsString;

public class DataframeCpsIT extends MlSingleNodeTestCase {
@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Stream.concat(super.getPlugins().stream(), Stream.of(CpsPlugin.class)).toList();
}

public void testCrossProjectFailsForDataFrameAnalytics() throws IOException {
var id = "test-cross-project-fails";
var sourceIndex = "project1:" + id + "_source_index";
var destIndex = id + "_results";

var config = new DataFrameAnalyticsConfig.Builder().setId(id)
.setSource(
new DataFrameAnalyticsSource(
new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()),
null,
Collections.emptyMap()
)
)
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(
new Classification(
"keyword-field",
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
null,
null,
null,
null,
null,
null,
null
)
)
.build();

var request = new PutDataFrameAnalyticsAction.Request(config);
var response = client().execute(PutDataFrameAnalyticsAction.INSTANCE, request);
var validationException = assertThrows(ValidationException.class, response::actionGet);
assertThat(validationException.getMessage(), containsString("cross-project indices are not supported"));
}

public static class CpsPlugin extends Plugin implements ClusterPlugin {
public List<Setting<?>> getSettings() {
return List.of(Setting.simpleString("serverless.cross_project.enabled", Setting.Property.NodeScope));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -118,6 +119,7 @@ public TransportPutDataFrameAnalyticsAction(
transportService.getRemoteClusterService(),
null,
null,
new CrossProjectModeDecider(clusterService.getSettings()).crossProjectEnabled(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried injecting either the CrossProjectModeDecider and just a boolean, but the powers that be don't like it when both Transform and MachineLearning declare the same objects as a component, and I couldn't figure out a way around that, so we're just parsing the boolean here and sending it down for now. We just need the boolean anyway, and it is a static setting so it won't change unless the cluster reboots anyway.

clusterService.getNodeName(),
License.OperationMode.PLATINUM.description()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -156,6 +157,7 @@ public TransportStartDataFrameAnalyticsAction(
transportService.getRemoteClusterService(),
null,
null,
new CrossProjectModeDecider(clusterService.getSettings()).crossProjectEnabled(),
clusterService.getNodeName(),
License.OperationMode.PLATINUM.description()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.license.License;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -116,6 +117,7 @@ public TransportPreviewTransformAction(
? new RemoteClusterLicenseChecker(client, null)
: null,
ingestService,
new CrossProjectModeDecider(clusterService.getSettings()).crossProjectEnabled(),
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
Expand Down
Loading