Skip to content

Commit 345b57c

Browse files
committed
[ML] Disable CPS for Dataframes
Cross-Project Search and Cross-Cluster Search is indefinitely disabled for Dataframe Analytics. The error message will now display if the syntax would have otherwise resolved to the respective feature.
1 parent ad70045 commit 345b57c

File tree

8 files changed

+189
-5
lines changed

8 files changed

+189
-5
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public final class SourceDestValidator {
7070
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
7171
+ "alias [{0}], license is not active";
7272
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
73+
public static final String CROSS_PROJECT_INDICES_NOT_SUPPORTED = "cross-project indices are not supported";
7374
public static final String REMOTE_CLUSTERS_TRANSPORT_TOO_OLD =
7475
"remote clusters are expected to run at least version [{0}] (reason: [{1}])," + " but the following clusters were too old: [{2}]";
7576
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
@@ -78,6 +79,7 @@ public final class SourceDestValidator {
7879
private final RemoteClusterService remoteClusterService;
7980
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
8081
private final IngestService ingestService;
82+
private final boolean crossProjectEnabled;
8183
private final String nodeName;
8284
private final String license;
8385

@@ -90,6 +92,7 @@ static class Context {
9092
private final RemoteClusterService remoteClusterService;
9193
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
9294
private final IngestService ingestService;
95+
private final boolean crossProjectEnabled;
9396
private final String[] source;
9497
private final String destIndex;
9598
private final String destPipeline;
@@ -107,6 +110,7 @@ static class Context {
107110
final RemoteClusterService remoteClusterService,
108111
final RemoteClusterLicenseChecker remoteClusterLicenseChecker,
109112
final IngestService ingestService,
113+
boolean crossProjectEnabled,
110114
final String[] source,
111115
final String destIndex,
112116
final String destPipeline,
@@ -118,6 +122,7 @@ static class Context {
118122
this.remoteClusterService = remoteClusterService;
119123
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
120124
this.ingestService = ingestService;
125+
this.crossProjectEnabled = crossProjectEnabled;
121126
this.source = source;
122127
this.destIndex = destIndex;
123128
this.destPipeline = destPipeline;
@@ -165,6 +170,10 @@ public String getLicense() {
165170
return license;
166171
}
167172

173+
private boolean crossProjectEnabled() {
174+
return crossProjectEnabled;
175+
}
176+
168177
public SortedSet<String> resolveSource() {
169178
if (resolvedSource == null) {
170179
resolveLocalAndRemoteSource();
@@ -266,23 +275,26 @@ public interface SourceDestValidation {
266275
* Create a new Source Dest Validator
267276
*
268277
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
269-
* @param remoteClusterService A valid RemoteClusterService object
278+
* @param remoteClusterService A valid RemoteClusterService object
270279
* @param remoteClusterLicenseChecker A RemoteClusterLicenseChecker or null if CCS is disabled
271-
* @param nodeName the name of this node
272-
* @param license the license of the feature validated for
280+
* @param crossProjectEnabled Determines if cross-project is enabled for this cluster
281+
* @param nodeName the name of this node
282+
* @param license the license of the feature validated for
273283
*/
274284
public SourceDestValidator(
275285
IndexNameExpressionResolver indexNameExpressionResolver,
276286
RemoteClusterService remoteClusterService,
277287
RemoteClusterLicenseChecker remoteClusterLicenseChecker,
278288
IngestService ingestService,
289+
boolean crossProjectEnabled,
279290
String nodeName,
280291
String license
281292
) {
282293
this.indexNameExpressionResolver = indexNameExpressionResolver;
283294
this.remoteClusterService = remoteClusterService;
284295
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
285296
this.ingestService = ingestService;
297+
this.crossProjectEnabled = crossProjectEnabled;
286298
this.nodeName = nodeName;
287299
this.license = license;
288300
}
@@ -311,6 +323,7 @@ public void validate(
311323
remoteClusterService,
312324
remoteClusterLicenseChecker,
313325
ingestService,
326+
crossProjectEnabled,
314327
source,
315328
destIndex,
316329
destPipeline,
@@ -555,7 +568,9 @@ static class RemoteSourceNotSupportedValidation implements SourceDestValidation
555568
@Override
556569
public void validate(Context context, ActionListener<Context> listener) {
557570
if (context.resolveRemoteSource().isEmpty() == false) {
558-
context.addValidationError(REMOTE_SOURCE_INDICES_NOT_SUPPORTED);
571+
context.addValidationError(
572+
context.crossProjectEnabled() ? CROSS_PROJECT_INDICES_NOT_SUPPORTED : REMOTE_SOURCE_INDICES_NOT_SUPPORTED
573+
);
559574
}
560575
listener.onResponse(context);
561576
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/RemoteClusterMinimumVersionValidationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class RemoteClusterMinimumVersionValidationTests extends ESTestCase {
4040

4141
@Before
4242
public void setUpMocks() {
43-
context = spy(new Context(null, null, null, null, null, null, null, null, null, null));
43+
context = spy(new Context(null, null, null, null, null, false, null, null, null, null, null));
4444
doReturn(TransportVersions.V_8_10_X).when(context).getRemoteClusterVersion("cluster-A");
4545
doReturn(TransportVersions.V_8_11_X).when(context).getRemoteClusterVersion("cluster-B");
4646
doReturn(TransportVersions.V_8_12_0).when(context).getRemoteClusterVersion("cluster-C");

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@
6767
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
6868
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
6969
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
70+
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION;
7071
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
72+
import static org.hamcrest.Matchers.containsString;
7173
import static org.hamcrest.Matchers.equalTo;
7274
import static org.mockito.Mockito.mock;
7375
import static org.mockito.Mockito.spy;
@@ -123,6 +125,7 @@ public class SourceDestValidatorTests extends ESTestCase {
123125
remoteClusterService,
124126
null,
125127
ingestService,
128+
false,
126129
"node_id",
127130
"license"
128131
);
@@ -649,6 +652,7 @@ public void testRemoteSourceBasic() throws InterruptedException {
649652
remoteClusterService,
650653
remoteClusterLicenseCheckerBasic,
651654
ingestService,
655+
false,
652656
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
653657
"dest",
654658
null,
@@ -667,6 +671,72 @@ public void testRemoteSourceBasic() throws InterruptedException {
667671
);
668672
}
669673

674+
public void testRemoteSourceNotSupportedValidationWithLocalIndex() throws InterruptedException {
675+
Context context = spy(
676+
new SourceDestValidator.Context(
677+
CLUSTER_STATE,
678+
indexNameExpressionResolver,
679+
remoteClusterService,
680+
remoteClusterLicenseCheckerBasic,
681+
ingestService,
682+
false,
683+
new String[] { SOURCE_1 },
684+
"dest",
685+
null,
686+
"node_id",
687+
"license"
688+
)
689+
);
690+
691+
assertValidationWithContext(listener -> REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION.validate(context, listener), c -> {
692+
assertNull(c.getValidationException());
693+
}, null);
694+
}
695+
696+
public void testRemoteSourceNotSupportedValidationWithRemoteIndex() throws InterruptedException {
697+
Context context = spy(
698+
new SourceDestValidator.Context(
699+
CLUSTER_STATE,
700+
indexNameExpressionResolver,
701+
remoteClusterService,
702+
remoteClusterLicenseCheckerBasic,
703+
ingestService,
704+
false,
705+
new String[] { REMOTE_BASIC + ":" + SOURCE_1 },
706+
"dest",
707+
null,
708+
"node_id",
709+
"license"
710+
)
711+
);
712+
713+
assertValidationWithContext(listener -> REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION.validate(context, listener), c -> {
714+
assertThat(c.getValidationException().getMessage(), containsString("remote source indices are not supported"));
715+
}, null);
716+
}
717+
718+
public void testRemoteSourceNotSupportedValidationWithCrossProjectIndex() throws InterruptedException {
719+
Context context = spy(
720+
new SourceDestValidator.Context(
721+
CLUSTER_STATE,
722+
indexNameExpressionResolver,
723+
remoteClusterService,
724+
remoteClusterLicenseCheckerBasic,
725+
ingestService,
726+
true,
727+
new String[] { "project1:" + SOURCE_1 },
728+
"dest",
729+
null,
730+
"node_id",
731+
"license"
732+
)
733+
);
734+
735+
assertValidationWithContext(listener -> REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION.validate(context, listener), c -> {
736+
assertThat(c.getValidationException().getMessage(), containsString("cross-project indices are not supported"));
737+
}, null);
738+
}
739+
670740
public void testRemoteSourcePlatinum() throws InterruptedException {
671741
final Context context = spy(
672742
new SourceDestValidator.Context(
@@ -675,6 +745,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
675745
remoteClusterService,
676746
new RemoteClusterLicenseChecker(clientWithBasicLicense, platinumFeature),
677747
ingestService,
748+
false,
678749
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
679750
"dest",
680751
null,
@@ -706,6 +777,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
706777
remoteClusterService,
707778
new RemoteClusterLicenseChecker(clientWithPlatinumLicense, platinumFeature),
708779
ingestService,
780+
false,
709781
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
710782
"dest",
711783
null,
@@ -728,6 +800,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
728800
remoteClusterService,
729801
new RemoteClusterLicenseChecker(clientWithPlatinumLicense, platinumFeature),
730802
ingestService,
803+
false,
731804
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
732805
"dest",
733806
"node_id",
@@ -751,6 +824,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
751824
remoteClusterService,
752825
new RemoteClusterLicenseChecker(clientWithTrialLicense, platinumFeature),
753826
ingestService,
827+
false,
754828
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
755829
"dest",
756830
"node_id",
@@ -776,6 +850,7 @@ public void testRemoteSourceLicenseInActive() throws InterruptedException {
776850
remoteClusterService,
777851
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, platinumFeature),
778852
ingestService,
853+
false,
779854
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
780855
"dest",
781856
null,
@@ -804,6 +879,7 @@ public void testRemoteSourceDoesNotExist() throws InterruptedException {
804879
remoteClusterService,
805880
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, platinumFeature),
806881
ingestService,
882+
false,
807883
new String[] { "non_existing_remote:" + "SOURCE_1" },
808884
"dest",
809885
null,
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.integration;
9+
10+
import org.elasticsearch.common.ValidationException;
11+
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.index.query.QueryBuilders;
14+
import org.elasticsearch.plugins.ClusterPlugin;
15+
import org.elasticsearch.plugins.Plugin;
16+
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
17+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
18+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
19+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
20+
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
21+
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
22+
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
23+
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
24+
25+
import java.io.IOException;
26+
import java.util.Collection;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.stream.Stream;
30+
31+
import static org.hamcrest.Matchers.containsString;
32+
33+
public class DataframeCpsIT extends MlSingleNodeTestCase {
34+
@Override
35+
protected Settings nodeSettings() {
36+
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
37+
}
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> getPlugins() {
41+
return Stream.concat(super.getPlugins().stream(), Stream.of(CpsPlugin.class)).toList();
42+
}
43+
44+
public void testCrossProjectFailsForDataFrameAnalytics() throws IOException {
45+
var id = "test-cross-project-fails";
46+
var sourceIndex = "project1:" + id + "_source_index";
47+
var destIndex = id + "_results";
48+
49+
var config = new DataFrameAnalyticsConfig.Builder().setId(id)
50+
.setSource(
51+
new DataFrameAnalyticsSource(
52+
new String[] { sourceIndex },
53+
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()),
54+
null,
55+
Collections.emptyMap()
56+
)
57+
)
58+
.setDest(new DataFrameAnalyticsDest(destIndex, null))
59+
.setAnalysis(
60+
new Classification(
61+
"keyword-field",
62+
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
63+
null,
64+
null,
65+
null,
66+
null,
67+
null,
68+
null,
69+
null
70+
)
71+
)
72+
.build();
73+
74+
var request = new PutDataFrameAnalyticsAction.Request(config);
75+
var response = client().execute(PutDataFrameAnalyticsAction.INSTANCE, request);
76+
var validationException = assertThrows(ValidationException.class, response::actionGet);
77+
assertThat(validationException.getMessage(), containsString("cross-project indices are not supported"));
78+
}
79+
80+
public static class CpsPlugin extends Plugin implements ClusterPlugin {
81+
public List<Setting<?>> getSettings() {
82+
return List.of(Setting.simpleString("serverless.cross_project.enabled", Setting.Property.NodeScope));
83+
}
84+
}
85+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.license.LicenseUtils;
2929
import org.elasticsearch.license.RemoteClusterLicenseChecker;
3030
import org.elasticsearch.license.XPackLicenseState;
31+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
3132
import org.elasticsearch.tasks.Task;
3233
import org.elasticsearch.threadpool.ThreadPool;
3334
import org.elasticsearch.transport.TransportService;
@@ -118,6 +119,7 @@ public TransportPutDataFrameAnalyticsAction(
118119
transportService.getRemoteClusterService(),
119120
null,
120121
null,
122+
new CrossProjectModeDecider(clusterService.getSettings()).crossProjectEnabled(),
121123
clusterService.getNodeName(),
122124
License.OperationMode.PLATINUM.description()
123125
);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
4545
import org.elasticsearch.persistent.PersistentTasksService;
4646
import org.elasticsearch.rest.RestStatus;
47+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
4748
import org.elasticsearch.tasks.Task;
4849
import org.elasticsearch.tasks.TaskId;
4950
import org.elasticsearch.threadpool.ThreadPool;
@@ -156,6 +157,7 @@ public TransportStartDataFrameAnalyticsAction(
156157
transportService.getRemoteClusterService(),
157158
null,
158159
null,
160+
new CrossProjectModeDecider(clusterService.getSettings()).crossProjectEnabled(),
159161
clusterService.getNodeName(),
160162
License.OperationMode.PLATINUM.description()
161163
);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.injection.guice.Inject;
3333
import org.elasticsearch.license.License;
3434
import org.elasticsearch.license.RemoteClusterLicenseChecker;
35+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
3536
import org.elasticsearch.tasks.Task;
3637
import org.elasticsearch.tasks.TaskId;
3738
import org.elasticsearch.threadpool.ThreadPool;
@@ -116,6 +117,7 @@ public TransportPreviewTransformAction(
116117
? new RemoteClusterLicenseChecker(client, null)
117118
: null,
118119
ingestService,
120+
new CrossProjectModeDecider(clusterService.getSettings()).crossProjectEnabled(),
119121
clusterService.getNodeName(),
120122
License.OperationMode.BASIC.description()
121123
);

0 commit comments

Comments
 (0)