Skip to content

Commit 7766546

Browse files
committed
datafeed: check remote_cluster_client before cluster aliases in start
TransportStartDatafeedAction previously tried to validate remote index cluster names in datafeed jobs, before checking if the local cluster had remote_cluster_client role. Because this role enables retrieval of the remote cluster names, the validation step would always fail with a no-such-cluster exception. This was confusing. This change moves the remote_cluster_client check ahead of cluster name validation, and adds a test. Closes ES-11841
1 parent f48a3c3 commit 7766546

File tree

2 files changed

+209
-11
lines changed

2 files changed

+209
-11
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams;
11+
12+
import org.apache.http.HttpHost;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.ResponseException;
16+
import org.elasticsearch.client.RestClient;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.junit.ClassRule;
23+
import org.junit.rules.RuleChain;
24+
import org.junit.rules.TestRule;
25+
26+
import java.io.IOException;
27+
import java.util.regex.Pattern;
28+
29+
/**
30+
* A test to check that remote_cluster_client errors are correctly reported when a datafeed job is started.
31+
* The local datafeed job references a remote index in a local anomaly detection job. When the
32+
* remote_cluster_client role is missing in the local cluster. This prevents remote indices from being
33+
* resolved to their cluster names.
34+
*
35+
* Written to address github issue 121149.
36+
*/
37+
public class DatafeedRemoteClusterClientIT extends ESRestTestCase {
38+
public static ElasticsearchCluster remoteCluster = ElasticsearchCluster.local()
39+
.name("remote_cluster")
40+
.distribution(DistributionType.DEFAULT)
41+
.module("data-streams")
42+
.module("x-pack-stack")
43+
.setting("xpack.security.enabled", "false")
44+
.setting("xpack.license.self_generated.type", "trial")
45+
.setting("cluster.logsdb.enabled", "true")
46+
.build();
47+
48+
public static ElasticsearchCluster localCluster = ElasticsearchCluster.local()
49+
.name("local_cluster")
50+
.distribution(DistributionType.DEFAULT)
51+
.module("data-streams")
52+
.module("x-pack-stack")
53+
.setting("xpack.security.enabled", "false")
54+
.setting("xpack.license.self_generated.type", "trial")
55+
.setting("cluster.logsdb.enabled", "true")
56+
.setting("node.roles", "[data,ingest,master,ml]") // remote_cluster_client not included
57+
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
58+
.setting("cluster.remote.connections_per_cluster", "1")
59+
.setting("cluster.remote.remote_cluster.skip_unavailable", "false")
60+
.build();
61+
62+
@ClassRule
63+
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
64+
65+
private RestClient localClusterClient() throws IOException {
66+
var clusterHosts = parseClusterHosts(localCluster.getHttpAddresses());
67+
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
68+
}
69+
70+
private RestClient remoteClusterClient() throws IOException {
71+
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
72+
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
73+
}
74+
75+
public void testSource() throws IOException {
76+
String localIndex = "local_index";
77+
String remoteIndex = "remote_index";
78+
String mapping = """
79+
{
80+
"properties": {
81+
"timestamp": {
82+
"type": "date"
83+
},
84+
"bytes": {
85+
"type": "integer"
86+
}
87+
}
88+
}
89+
""";
90+
try (RestClient localClient = localClusterClient(); RestClient remoteClient = remoteClusterClient()) {
91+
createIndex(
92+
remoteClient,
93+
remoteIndex,
94+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20)).build(),
95+
mapping
96+
);
97+
98+
Request request = new Request("PUT", "_ml/anomaly_detectors/test_anomaly_detector");
99+
request.setJsonEntity("""
100+
{
101+
"analysis_config": {
102+
"bucket_span": "15m",
103+
"detectors": [
104+
{
105+
"detector_description": "Sum of bytes",
106+
"function": "sum",
107+
"field_name": "bytes"
108+
}
109+
]
110+
},
111+
"data_description": {
112+
"time_field": "timestamp",
113+
"time_format": "epoch_ms"
114+
},
115+
"analysis_limits": {
116+
"model_memory_limit": "11MB"
117+
},
118+
"model_plot_config": {
119+
"enabled": true,
120+
"annotations_enabled": true
121+
},
122+
"results_index_name": "test_datafeed_out",
123+
"datafeed_config": {
124+
"indices": [
125+
"remote_cluster:remote_index"
126+
],
127+
"query": {
128+
"bool": {
129+
"must": [
130+
{
131+
"match_all": {}
132+
}
133+
]
134+
}
135+
},
136+
"runtime_mappings": {
137+
"hour_of_day": {
138+
"type": "long",
139+
"script": {
140+
"source": "emit(doc['timestamp'].value.getHour());"
141+
}
142+
}
143+
},
144+
"datafeed_id": "test_datafeed"
145+
}
146+
}""");
147+
Response response = localClient.performRequest(request);
148+
logger.info("Anomaly Detection Response:", response.getStatusLine());
149+
150+
request = new Request("GET", "_ml/anomaly_detectors/test_anomaly_detector");
151+
response = localClient.performRequest(request);
152+
logger.info("Anomaly detection get:", response.getEntity());
153+
154+
request = new Request("POST", "_ml/anomaly_detectors/test_anomaly_detector/_open");
155+
response = localClient.performRequest(request);
156+
157+
final Request startRequest = new Request("POST", "_ml/datafeeds/test_datafeed/_start");
158+
request.setJsonEntity("""
159+
{
160+
"start": "2019-04-07T18:22:16Z"
161+
}
162+
""");
163+
try {
164+
localClient.performRequest(startRequest);
165+
assertTrue("This request should fail", false);
166+
} catch (ResponseException e) {
167+
String[] messages = e.getMessage().split("\n");
168+
assertTrue(Pattern.matches("""
169+
.*Datafeed \\[test_datafeed\\] is configured with a remote index pattern\\(s\\) \
170+
\\[remote_cluster:remote_index\\] but the current node \\[local_cluster-0\\] \
171+
is not allowed to connect to remote clusters. Please enable node.remote_cluster_client \
172+
for all machine learning nodes and master-eligible nodes.*""", messages[1]));
173+
}
174+
}
175+
}
176+
177+
@Override
178+
protected String getTestRestCluster() {
179+
return localCluster.getHttpAddresses();
180+
}
181+
182+
@Override
183+
protected boolean preserveIndicesUponCompletion() {
184+
return true;
185+
}
186+
187+
@Override
188+
protected boolean preserveClusterUponCompletion() {
189+
return true;
190+
}
191+
192+
@Override
193+
protected boolean preserveDataStreamsUponCompletion() {
194+
return true;
195+
}
196+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,19 @@ public void onFailure(Exception e) {
223223
Consumer<Job> createDataExtractor = job -> {
224224
final List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedIndices());
225225
if (remoteIndices.isEmpty() == false) {
226+
if (remoteClusterClient == false) {
227+
responseHeaderPreservingListener.onFailure(
228+
ExceptionsHelper.badRequestException(
229+
Messages.getMessage(
230+
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
231+
datafeedConfigHolder.get().getId(),
232+
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
233+
clusterService.getNodeName()
234+
)
235+
)
236+
);
237+
}
238+
226239
final RemoteClusterLicenseChecker remoteClusterLicenseChecker = new RemoteClusterLicenseChecker(
227240
client,
228241
MachineLearningField.ML_API_FEATURE
@@ -235,17 +248,6 @@ public void onFailure(Exception e) {
235248
ActionListener.wrap(response -> {
236249
if (response.isSuccess() == false) {
237250
responseHeaderPreservingListener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
238-
} else if (remoteClusterClient == false) {
239-
responseHeaderPreservingListener.onFailure(
240-
ExceptionsHelper.badRequestException(
241-
Messages.getMessage(
242-
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
243-
datafeedConfigHolder.get().getId(),
244-
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
245-
clusterService.getNodeName()
246-
)
247-
)
248-
);
249251
} else {
250252
final RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
251253
List<String> remoteAliases = RemoteClusterLicenseChecker.remoteClusterAliases(

0 commit comments

Comments
 (0)