diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedClusterConnectionConfig.java b/server/src/main/java/org/elasticsearch/transport/LinkedClusterConnectionConfig.java new file mode 100644 index 0000000000000..7b820d6e7f279 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/LinkedClusterConnectionConfig.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.settings.Settings; + +import java.util.List; + +import static org.elasticsearch.transport.RemoteConnectionStrategy.REMOTE_CONNECTION_MODE; + +public class LinkedClusterConnectionConfig { + private final String clusterAlias; + private final String proxyAddress; + // we would not actually store settings in the final version + private final Settings settings; + + public LinkedClusterConnectionConfig(String clusterAlias, Settings settings) { + this.clusterAlias = clusterAlias; + this.proxyAddress = ProxyConnectionStrategy.PROXY_ADDRESS.get(settings); + this.settings = settings; + } + + // this would be called by the CPS ProjectCustom based implementation + public LinkedClusterConnectionConfig(String clusterAlias, String proxyAddress) { + this.clusterAlias = clusterAlias; + this.proxyAddress = proxyAddress; + this.settings = null; + } + + public List getSeeds() { + return SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); + } + + public String getProxyAddress() { + return proxyAddress; + } + + public RemoteConnectionStrategy.ConnectionStrategy getConnectionMode() { + return REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings); + } + + public Settings getSettings() { + return settings; + } + + public String getClusterAlias() { + return clusterAlias; + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedClusterConnectionConfigListener.java b/server/src/main/java/org/elasticsearch/transport/LinkedClusterConnectionConfigListener.java new file mode 100644 index 0000000000000..b9dcaed857e81 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/LinkedClusterConnectionConfigListener.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; + +import java.util.List; +import java.util.function.Consumer; + +public interface LinkedClusterConnectionConfigListener { + + void listenForConnectionConfigChanges(Consumer consumer); + + class ClusterSettingsListener implements LinkedClusterConnectionConfigListener { + private final ClusterSettings clusterSettings; + + public ClusterSettingsListener(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + } + + @Override + public void listenForConnectionConfigChanges(Consumer consumer) { + List> remoteClusterSettings = List.of( + RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, + SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, + ProxyConnectionStrategy.PROXY_ADDRESS, + ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, + ProxyConnectionStrategy.SERVER_NAME + ); + clusterSettings.addAffixGroupUpdateConsumer( + remoteClusterSettings, + (alias, settings) -> consumer.accept(new LinkedClusterConnectionConfig(alias, settings)) + ); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 95e507f70d7a9..ba80094eed085 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; @@ -208,34 +207,27 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } - void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + void validateAndUpdateRemoteCluster(LinkedClusterConnectionConfig config) { + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(config.getClusterAlias())) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } - updateRemoteCluster(clusterAlias, settings); + updateRemoteCluster(config); } /** * Subclasses must implement this to receive information about updated cluster aliases. */ - protected abstract void updateRemoteCluster(String clusterAlias, Settings settings); + protected abstract void updateRemoteCluster(LinkedClusterConnectionConfig config); + + public void listenForUpdates(ClusterSettings clusterSettings) { + assert false : "this will be removed"; + } /** - * Registers this instance to listen to updates on the cluster settings. + * Registers this instance to listen to updates on linked cluster configuration. */ - public void listenForUpdates(ClusterSettings clusterSettings) { - List> remoteClusterSettings = List.of( - RemoteClusterService.REMOTE_CLUSTER_COMPRESS, - RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, - RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, - SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, - SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, - SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, - ProxyConnectionStrategy.PROXY_ADDRESS, - ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategy.SERVER_NAME - ); - clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); + public void listenForUpdates(LinkedClusterConnectionConfigListener listener) { + listener.listenForConnectionConfigChanges(this::validateAndUpdateRemoteCluster); } public static String buildRemoteIndexName(String clusterAlias, String indexName) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 656d19373d7e7..57ffa229d29d5 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -398,7 +398,8 @@ public void onFailure(Exception e) { } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { + protected void updateRemoteCluster(LinkedClusterConnectionConfig config) { + String clusterAlias = config.getClusterAlias(); CountDownLatch latch = new CountDownLatch(1); updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() { @Override diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index a715797b97977..88de930a32fef 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -189,6 +189,17 @@ static Set getRemoteClusters(Settings settings) { return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet()); } + public static boolean isConnectionEnabled(LinkedClusterConnectionConfig config) { + ConnectionStrategy mode = config.getConnectionMode(); + if (mode.equals(ConnectionStrategy.SNIFF)) { + List seeds = config.getSeeds(); + return seeds.isEmpty() == false; + } else { + String address = config.getProxyAddress(); + return Strings.isEmpty(address) == false; + } + } + public static boolean isConnectionEnabled(String clusterAlias, Settings settings) { ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings); if (mode.equals(ConnectionStrategy.SNIFF)) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a23a6f3367351..4595e2734a31c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -284,7 +284,8 @@ public TransportService( clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); if (remoteClusterClient) { - remoteClusterService.listenForUpdates(clusterSettings); + // we wouldn't actually instantiate LinkedClusterConnectionConfigListener here, instead it would be injected via SPI + remoteClusterService.listenForUpdates(new LinkedClusterConnectionConfigListener.ClusterSettingsListener(clusterSettings)); } clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 30699b9346300..a6a556bbf6c3c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -497,7 +497,10 @@ public void testIncrementallyAddClusters() throws IOException { // connect before returning. new Thread(() -> { try { - service.validateAndUpdateRemoteCluster("cluster_1", cluster1Settings); + service.validateAndUpdateRemoteCluster( + "cluster_1", + new LinkedClusterConnectionConfig("cluster_1", cluster1Settings) + ); clusterAdded.onResponse(null); } catch (Exception e) { clusterAdded.onFailure(e); @@ -510,16 +513,22 @@ public void testIncrementallyAddClusters() throws IOException { "cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()) ); - service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings); + service.validateAndUpdateRemoteCluster("cluster_2", new LinkedClusterConnectionConfig("cluster_2", cluster2Settings)); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); - service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled); + service.validateAndUpdateRemoteCluster( + "cluster_2", + new LinkedClusterConnectionConfig("cluster_2", cluster2SettingsDisabled) + ); assertFalse(isRemoteClusterRegistered(service, "cluster_2")); IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, - () -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY) + () -> service.validateAndUpdateRemoteCluster( + RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, + new LinkedClusterConnectionConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY) + ) ); assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } @@ -566,7 +575,10 @@ public void testDefaultPingSchedule() throws IOException { assertTrue(hasRegisteredClusters(service)); service.validateAndUpdateRemoteCluster( "cluster_1", - createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())) + new LinkedClusterConnectionConfig( + "cluster_1", + createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())) + ) ); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); @@ -677,7 +689,10 @@ public void testChangeSettings() throws Exception { settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); } settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build()); + service.validateAndUpdateRemoteCluster( + "cluster_1", + new LinkedClusterConnectionConfig("cluster_1", settingsChange.build()) + ); assertBusy(remoteClusterConnection::isClosed); remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index ff39fd587dc3a..3c6e0e4aadac8 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -31,6 +31,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.transport.LinkedClusterConnectionConfig; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteConnectionStrategy; @@ -545,6 +546,8 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; + // This would also need to change: either the LinkedClusterConnectionConfigListener can return a list of cluster aliases, + // or we use a different class that returns this info private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { super(settings); clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); @@ -552,11 +555,11 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + protected void updateRemoteCluster(LinkedClusterConnectionConfig config) { + if (RemoteConnectionStrategy.isConnectionEnabled(config)) { + clusters.add(config.getClusterAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.getClusterAlias()); } }