Skip to content

Commit 11b5c78

Browse files
authored
datafeed: check remote_cluster_client before cluster aliases in start (#129601) (#129802)
TransportStartDatafeedAction previously tried to validate remote index cluster names in datafeed jobs, before checking if the local cluster had remote_cluster_client role. Because this role enables retrieval of the remote cluster names, the validation step would always fail with a no-such-cluster exception. This was confusing. This change moves the remote_cluster_client check ahead of cluster name validation, and adds a test. Closes ES-11841 Closes #121149
1 parent 1a8fdbe commit 11b5c78

File tree

3 files changed

+202
-11
lines changed

3 files changed

+202
-11
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
apply plugin: 'elasticsearch.internal-java-rest-test'
2+
apply plugin: 'elasticsearch.rest-resources'
3+
4+
dependencies {
5+
testImplementation project(':x-pack:qa')
6+
testImplementation project(path: ':test:test-clusters')
7+
}
8+
9+
tasks.named('javaRestTest') {
10+
usesDefaultDistribution("Test internally configures the clusters")
11+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.xpack.ml.datafeed;
11+
12+
import org.apache.http.HttpHost;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.ResponseException;
16+
import org.elasticsearch.client.RestClient;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.junit.ClassRule;
23+
import org.junit.rules.RuleChain;
24+
import org.junit.rules.TestRule;
25+
26+
import java.io.IOException;
27+
28+
import static org.hamcrest.Matchers.containsString;
29+
30+
/**
31+
* A test to check that remote_cluster_client errors are correctly reported when a datafeed job is started.
32+
* The local datafeed job references a remote index in a local anomaly detection job. When the
33+
* remote_cluster_client role is missing in the local cluster. This prevents remote indices from being
34+
* resolved to their cluster names.
35+
*
36+
* @see <a href="https://github.com/elastic/elasticsearch/issues/121149">GitHub issue 121149</a>
37+
*/
38+
public class DatafeedRemoteClusterClientIT extends ESRestTestCase {
39+
public static ElasticsearchCluster remoteCluster = ElasticsearchCluster.local()
40+
.name("remote_cluster")
41+
.distribution(DistributionType.DEFAULT)
42+
.module("data-streams")
43+
.module("x-pack-stack")
44+
.setting("xpack.security.enabled", "false")
45+
.setting("xpack.license.self_generated.type", "trial")
46+
.setting("cluster.logsdb.enabled", "true")
47+
.build();
48+
49+
public static ElasticsearchCluster localCluster = ElasticsearchCluster.local()
50+
.name("local_cluster")
51+
.distribution(DistributionType.DEFAULT)
52+
.module("data-streams")
53+
.module("x-pack-stack")
54+
.setting("xpack.security.enabled", "false")
55+
.setting("xpack.license.self_generated.type", "trial")
56+
.setting("cluster.logsdb.enabled", "true")
57+
.setting("node.roles", "[data,ingest,master,ml]") // remote_cluster_client not included
58+
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
59+
.setting("cluster.remote.connections_per_cluster", "1")
60+
.setting("cluster.remote.remote_cluster.skip_unavailable", "false")
61+
.build();
62+
63+
@ClassRule
64+
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
65+
66+
private RestClient localClusterClient() throws IOException {
67+
var clusterHosts = parseClusterHosts(localCluster.getHttpAddresses());
68+
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
69+
}
70+
71+
private RestClient remoteClusterClient() throws IOException {
72+
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
73+
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
74+
}
75+
76+
public void testSource() throws IOException {
77+
String localIndex = "local_index";
78+
String remoteIndex = "remote_index";
79+
String mapping = """
80+
{
81+
"properties": {
82+
"timestamp": {
83+
"type": "date"
84+
},
85+
"bytes": {
86+
"type": "integer"
87+
}
88+
}
89+
}
90+
""";
91+
try (RestClient localClient = localClusterClient(); RestClient remoteClient = remoteClusterClient()) {
92+
createIndex(
93+
remoteClient,
94+
remoteIndex,
95+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20)).build(),
96+
mapping
97+
);
98+
99+
Request request = new Request("PUT", "_ml/anomaly_detectors/test_anomaly_detector");
100+
request.setJsonEntity("""
101+
{
102+
"analysis_config": {
103+
"bucket_span": "15m",
104+
"detectors": [
105+
{
106+
"detector_description": "Sum of bytes",
107+
"function": "sum",
108+
"field_name": "bytes"
109+
}
110+
]
111+
},
112+
"data_description": {
113+
"time_field": "timestamp",
114+
"time_format": "epoch_ms"
115+
},
116+
"analysis_limits": {
117+
"model_memory_limit": "11MB"
118+
},
119+
"model_plot_config": {
120+
"enabled": true,
121+
"annotations_enabled": true
122+
},
123+
"results_index_name": "test_datafeed_out",
124+
"datafeed_config": {
125+
"indices": [
126+
"remote_cluster:remote_index"
127+
],
128+
"query": {
129+
"bool": {
130+
"must": [
131+
{
132+
"match_all": {}
133+
}
134+
]
135+
}
136+
},
137+
"runtime_mappings": {
138+
"hour_of_day": {
139+
"type": "long",
140+
"script": {
141+
"source": "emit(doc['timestamp'].value.getHour());"
142+
}
143+
}
144+
},
145+
"datafeed_id": "test_datafeed"
146+
}
147+
}""");
148+
Response response = localClient.performRequest(request);
149+
logger.info("Anomaly Detection Response:", response.getStatusLine());
150+
151+
request = new Request("GET", "_ml/anomaly_detectors/test_anomaly_detector");
152+
response = localClient.performRequest(request);
153+
logger.info("Anomaly detection get:", response.getEntity());
154+
155+
request = new Request("POST", "_ml/anomaly_detectors/test_anomaly_detector/_open");
156+
response = localClient.performRequest(request);
157+
158+
final Request startRequest = new Request("POST", "_ml/datafeeds/test_datafeed/_start");
159+
request.setJsonEntity("""
160+
{
161+
"start": "2019-04-07T18:22:16Z"
162+
}
163+
""");
164+
ResponseException e = assertThrows(ResponseException.class, () -> localClient.performRequest(startRequest));
165+
assertThat(e.getMessage(), containsString("""
166+
Datafeed [test_datafeed] is configured with a remote index pattern(s) \
167+
[remote_cluster:remote_index] but the current node [local_cluster-0] \
168+
is not allowed to connect to remote clusters. Please enable node.remote_cluster_client \
169+
for all machine learning nodes and master-eligible nodes"""));
170+
}
171+
}
172+
173+
@Override
174+
protected String getTestRestCluster() {
175+
return localCluster.getHttpAddresses();
176+
}
177+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,20 @@ public void onFailure(Exception e) {
223223
Consumer<Job> createDataExtractor = job -> {
224224
final List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedIndices());
225225
if (remoteIndices.isEmpty() == false) {
226+
if (remoteClusterClient == false) {
227+
responseHeaderPreservingListener.onFailure(
228+
ExceptionsHelper.badRequestException(
229+
Messages.getMessage(
230+
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
231+
datafeedConfigHolder.get().getId(),
232+
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
233+
clusterService.getNodeName()
234+
)
235+
)
236+
);
237+
return;
238+
}
239+
226240
final RemoteClusterLicenseChecker remoteClusterLicenseChecker = new RemoteClusterLicenseChecker(
227241
client,
228242
MachineLearningField.ML_API_FEATURE
@@ -235,17 +249,6 @@ public void onFailure(Exception e) {
235249
ActionListener.wrap(response -> {
236250
if (response.isSuccess() == false) {
237251
responseHeaderPreservingListener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
238-
} else if (remoteClusterClient == false) {
239-
responseHeaderPreservingListener.onFailure(
240-
ExceptionsHelper.badRequestException(
241-
Messages.getMessage(
242-
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
243-
datafeedConfigHolder.get().getId(),
244-
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
245-
clusterService.getNodeName()
246-
)
247-
)
248-
);
249252
} else {
250253
final RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
251254
List<String> remoteAliases = RemoteClusterLicenseChecker.remoteClusterAliases(

0 commit comments

Comments
 (0)