| 
 | 1 | +/*  | 
 | 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one  | 
 | 3 | + * or more contributor license agreements. Licensed under the "Elastic License  | 
 | 4 | + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side  | 
 | 5 | + * Public License v 1"; you may not use this file except in compliance with, at  | 
 | 6 | + * your election, the "Elastic License 2.0", the "GNU Affero General Public  | 
 | 7 | + * License v3.0 only", or the "Server Side Public License, v 1".  | 
 | 8 | + */  | 
 | 9 | + | 
 | 10 | +package org.elasticsearch.xpack.ml.datafeed;  | 
 | 11 | + | 
 | 12 | +import org.apache.http.HttpHost;  | 
 | 13 | +import org.elasticsearch.client.Request;  | 
 | 14 | +import org.elasticsearch.client.Response;  | 
 | 15 | +import org.elasticsearch.client.ResponseException;  | 
 | 16 | +import org.elasticsearch.client.RestClient;  | 
 | 17 | +import org.elasticsearch.cluster.metadata.IndexMetadata;  | 
 | 18 | +import org.elasticsearch.common.settings.Settings;  | 
 | 19 | +import org.elasticsearch.test.cluster.ElasticsearchCluster;  | 
 | 20 | +import org.elasticsearch.test.cluster.local.distribution.DistributionType;  | 
 | 21 | +import org.elasticsearch.test.rest.ESRestTestCase;  | 
 | 22 | +import org.junit.ClassRule;  | 
 | 23 | +import org.junit.rules.RuleChain;  | 
 | 24 | +import org.junit.rules.TestRule;  | 
 | 25 | + | 
 | 26 | +import java.io.IOException;  | 
 | 27 | + | 
 | 28 | +import static org.hamcrest.Matchers.containsString;  | 
 | 29 | + | 
 | 30 | +/**  | 
 | 31 | + * A test to check that remote_cluster_client errors are correctly reported when a datafeed job is started.  | 
 | 32 | + * The local datafeed job references a remote index in a local anomaly detection job. When the  | 
 | 33 | + * remote_cluster_client role is missing in the local cluster. This prevents remote indices from being  | 
 | 34 | + * resolved to their cluster names.  | 
 | 35 | + *  | 
 | 36 | + * @see <a href="https://github.com/elastic/elasticsearch/issues/121149">GitHub issue 121149</a>  | 
 | 37 | + */  | 
 | 38 | +public class DatafeedRemoteClusterClientIT extends ESRestTestCase {  | 
 | 39 | +    public static ElasticsearchCluster remoteCluster = ElasticsearchCluster.local()  | 
 | 40 | +        .name("remote_cluster")  | 
 | 41 | +        .distribution(DistributionType.DEFAULT)  | 
 | 42 | +        .module("data-streams")  | 
 | 43 | +        .module("x-pack-stack")  | 
 | 44 | +        .setting("xpack.security.enabled", "false")  | 
 | 45 | +        .setting("xpack.license.self_generated.type", "trial")  | 
 | 46 | +        .setting("cluster.logsdb.enabled", "true")  | 
 | 47 | +        .build();  | 
 | 48 | + | 
 | 49 | +    public static ElasticsearchCluster localCluster = ElasticsearchCluster.local()  | 
 | 50 | +        .name("local_cluster")  | 
 | 51 | +        .distribution(DistributionType.DEFAULT)  | 
 | 52 | +        .module("data-streams")  | 
 | 53 | +        .module("x-pack-stack")  | 
 | 54 | +        .setting("xpack.security.enabled", "false")  | 
 | 55 | +        .setting("xpack.license.self_generated.type", "trial")  | 
 | 56 | +        .setting("cluster.logsdb.enabled", "true")  | 
 | 57 | +        .setting("node.roles", "[data,ingest,master,ml]") // remote_cluster_client not included  | 
 | 58 | +        .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")  | 
 | 59 | +        .setting("cluster.remote.connections_per_cluster", "1")  | 
 | 60 | +        .setting("cluster.remote.remote_cluster.skip_unavailable", "false")  | 
 | 61 | +        .build();  | 
 | 62 | + | 
 | 63 | +    @ClassRule  | 
 | 64 | +    public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);  | 
 | 65 | + | 
 | 66 | +    private RestClient localClusterClient() throws IOException {  | 
 | 67 | +        var clusterHosts = parseClusterHosts(localCluster.getHttpAddresses());  | 
 | 68 | +        return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));  | 
 | 69 | +    }  | 
 | 70 | + | 
 | 71 | +    private RestClient remoteClusterClient() throws IOException {  | 
 | 72 | +        var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());  | 
 | 73 | +        return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));  | 
 | 74 | +    }  | 
 | 75 | + | 
 | 76 | +    public void testSource() throws IOException {  | 
 | 77 | +        String localIndex = "local_index";  | 
 | 78 | +        String remoteIndex = "remote_index";  | 
 | 79 | +        String mapping = """  | 
 | 80 | +                {  | 
 | 81 | +                    "properties": {  | 
 | 82 | +                        "timestamp": {  | 
 | 83 | +                            "type": "date"  | 
 | 84 | +                        },  | 
 | 85 | +                        "bytes": {  | 
 | 86 | +                            "type": "integer"  | 
 | 87 | +                        }  | 
 | 88 | +                    }  | 
 | 89 | +                }  | 
 | 90 | +            """;  | 
 | 91 | +        try (RestClient localClient = localClusterClient(); RestClient remoteClient = remoteClusterClient()) {  | 
 | 92 | +            createIndex(  | 
 | 93 | +                remoteClient,  | 
 | 94 | +                remoteIndex,  | 
 | 95 | +                Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20)).build(),  | 
 | 96 | +                mapping  | 
 | 97 | +            );  | 
 | 98 | + | 
 | 99 | +            Request request = new Request("PUT", "_ml/anomaly_detectors/test_anomaly_detector");  | 
 | 100 | +            request.setJsonEntity("""  | 
 | 101 | +                {  | 
 | 102 | +                  "analysis_config": {  | 
 | 103 | +                    "bucket_span": "15m",  | 
 | 104 | +                    "detectors": [  | 
 | 105 | +                      {  | 
 | 106 | +                        "detector_description": "Sum of bytes",  | 
 | 107 | +                        "function": "sum",  | 
 | 108 | +                        "field_name": "bytes"  | 
 | 109 | +                      }  | 
 | 110 | +                    ]  | 
 | 111 | +                  },  | 
 | 112 | +                  "data_description": {  | 
 | 113 | +                    "time_field": "timestamp",  | 
 | 114 | +                    "time_format": "epoch_ms"  | 
 | 115 | +                  },  | 
 | 116 | +                  "analysis_limits": {  | 
 | 117 | +                    "model_memory_limit": "11MB"  | 
 | 118 | +                  },  | 
 | 119 | +                  "model_plot_config": {  | 
 | 120 | +                    "enabled": true,  | 
 | 121 | +                    "annotations_enabled": true  | 
 | 122 | +                  },  | 
 | 123 | +                  "results_index_name": "test_datafeed_out",  | 
 | 124 | +                  "datafeed_config": {  | 
 | 125 | +                    "indices": [  | 
 | 126 | +                      "remote_cluster:remote_index"  | 
 | 127 | +                    ],  | 
 | 128 | +                    "query": {  | 
 | 129 | +                      "bool": {  | 
 | 130 | +                        "must": [  | 
 | 131 | +                          {  | 
 | 132 | +                            "match_all": {}  | 
 | 133 | +                          }  | 
 | 134 | +                        ]  | 
 | 135 | +                      }  | 
 | 136 | +                    },  | 
 | 137 | +                    "runtime_mappings": {  | 
 | 138 | +                      "hour_of_day": {  | 
 | 139 | +                        "type": "long",  | 
 | 140 | +                        "script": {  | 
 | 141 | +                          "source": "emit(doc['timestamp'].value.getHour());"  | 
 | 142 | +                        }  | 
 | 143 | +                      }  | 
 | 144 | +                    },  | 
 | 145 | +                    "datafeed_id": "test_datafeed"  | 
 | 146 | +                  }  | 
 | 147 | +                }""");  | 
 | 148 | +            Response response = localClient.performRequest(request);  | 
 | 149 | +            logger.info("Anomaly Detection Response:", response.getStatusLine());  | 
 | 150 | + | 
 | 151 | +            request = new Request("GET", "_ml/anomaly_detectors/test_anomaly_detector");  | 
 | 152 | +            response = localClient.performRequest(request);  | 
 | 153 | +            logger.info("Anomaly detection get:", response.getEntity());  | 
 | 154 | + | 
 | 155 | +            request = new Request("POST", "_ml/anomaly_detectors/test_anomaly_detector/_open");  | 
 | 156 | +            response = localClient.performRequest(request);  | 
 | 157 | + | 
 | 158 | +            final Request startRequest = new Request("POST", "_ml/datafeeds/test_datafeed/_start");  | 
 | 159 | +            request.setJsonEntity("""  | 
 | 160 | +                {  | 
 | 161 | +                    "start": "2019-04-07T18:22:16Z"  | 
 | 162 | +                }  | 
 | 163 | +                """);  | 
 | 164 | +            ResponseException e = assertThrows(ResponseException.class, () -> localClient.performRequest(startRequest));  | 
 | 165 | +            assertThat(e.getMessage(), containsString("""  | 
 | 166 | +                Datafeed [test_datafeed] is configured with a remote index pattern(s) \  | 
 | 167 | +                [remote_cluster:remote_index] but the current node [local_cluster-0] \  | 
 | 168 | +                is not allowed to connect to remote clusters. Please enable node.remote_cluster_client \  | 
 | 169 | +                for all machine learning nodes and master-eligible nodes"""));  | 
 | 170 | +        }  | 
 | 171 | +    }  | 
 | 172 | + | 
 | 173 | +    @Override  | 
 | 174 | +    protected String getTestRestCluster() {  | 
 | 175 | +        return localCluster.getHttpAddresses();  | 
 | 176 | +    }  | 
 | 177 | +}  | 
0 commit comments