Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.common.settings.Settings;

import java.util.List;

import static org.elasticsearch.transport.RemoteConnectionStrategy.REMOTE_CONNECTION_MODE;

public class LinkedClusterConnectionConfig {
private final String clusterAlias;
private final String proxyAddress;
// we would not actually store settings in the final version
private final Settings settings;

public LinkedClusterConnectionConfig(String clusterAlias, Settings settings) {
this.clusterAlias = clusterAlias;
this.proxyAddress = ProxyConnectionStrategy.PROXY_ADDRESS.get(settings);
this.settings = settings;
}

// this would be called by the CPS ProjectCustom based implementation
public LinkedClusterConnectionConfig(String clusterAlias, String proxyAddress) {
this.clusterAlias = clusterAlias;
this.proxyAddress = proxyAddress;
this.settings = null;
}

public List<String> getSeeds() {
return SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
}

public String getProxyAddress() {
return proxyAddress;
}

public RemoteConnectionStrategy.ConnectionStrategy getConnectionMode() {
return REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
}

public Settings getSettings() {
return settings;
}

public String getClusterAlias() {
return clusterAlias;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;

import java.util.List;
import java.util.function.Consumer;

public interface LinkedClusterConnectionConfigListener {

void listenForConnectionConfigChanges(Consumer<LinkedClusterConnectionConfig> consumer);

class ClusterSettingsListener implements LinkedClusterConnectionConfigListener {
private final ClusterSettings clusterSettings;

public ClusterSettingsListener(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
}

@Override
public void listenForConnectionConfigChanges(Consumer<LinkedClusterConnectionConfig> consumer) {
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(
remoteClusterSettings,
(alias, settings) -> consumer.accept(new LinkedClusterConnectionConfig(alias, settings))
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -208,34 +207,27 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
return perClusterIndices;
}

void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
void validateAndUpdateRemoteCluster(LinkedClusterConnectionConfig config) {
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(config.getClusterAlias())) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
updateRemoteCluster(clusterAlias, settings);
updateRemoteCluster(config);
}

/**
* Subclasses must implement this to receive information about updated cluster aliases.
*/
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);
protected abstract void updateRemoteCluster(LinkedClusterConnectionConfig config);

public void listenForUpdates(ClusterSettings clusterSettings) {
assert false : "this will be removed";
}

/**
* Registers this instance to listen to updates on the cluster settings.
* Registers this instance to listen to updates on linked cluster configuration.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
public void listenForUpdates(LinkedClusterConnectionConfigListener listener) {
listener.listenForConnectionConfigChanges(this::validateAndUpdateRemoteCluster);
}

public static String buildRemoteIndexName(String clusterAlias, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ public void onFailure(Exception e) {
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
protected void updateRemoteCluster(LinkedClusterConnectionConfig config) {
String clusterAlias = config.getClusterAlias();
CountDownLatch latch = new CountDownLatch(1);
updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ static Set<String> getRemoteClusters(Settings settings) {
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
}

public static boolean isConnectionEnabled(LinkedClusterConnectionConfig config) {
ConnectionStrategy mode = config.getConnectionMode();
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = config.getSeeds();
return seeds.isEmpty() == false;
} else {
String address = config.getProxyAddress();
return Strings.isEmpty(address) == false;
}
}

public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
if (mode.equals(ConnectionStrategy.SNIFF)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ public TransportService(
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
if (remoteClusterClient) {
remoteClusterService.listenForUpdates(clusterSettings);
// we wouldn't actually instantiate LinkedClusterConnectionConfigListener here, instead it would be injected via SPI
remoteClusterService.listenForUpdates(new LinkedClusterConnectionConfigListener.ClusterSettingsListener(clusterSettings));
}
clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,10 @@ public void testIncrementallyAddClusters() throws IOException {
// connect before returning.
new Thread(() -> {
try {
service.validateAndUpdateRemoteCluster("cluster_1", cluster1Settings);
service.validateAndUpdateRemoteCluster(
"cluster_1",
new LinkedClusterConnectionConfig("cluster_1", cluster1Settings)
);
clusterAdded.onResponse(null);
} catch (Exception e) {
clusterAdded.onFailure(e);
Expand All @@ -510,16 +513,22 @@ public void testIncrementallyAddClusters() throws IOException {
"cluster_2",
Collections.singletonList(cluster2Seed.getAddress().toString())
);
service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings);
service.validateAndUpdateRemoteCluster("cluster_2", new LinkedClusterConnectionConfig("cluster_2", cluster2Settings));
assertTrue(hasRegisteredClusters(service));
assertTrue(isRemoteClusterRegistered(service, "cluster_1"));
assertTrue(isRemoteClusterRegistered(service, "cluster_2"));
Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList());
service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled);
service.validateAndUpdateRemoteCluster(
"cluster_2",
new LinkedClusterConnectionConfig("cluster_2", cluster2SettingsDisabled)
);
assertFalse(isRemoteClusterRegistered(service, "cluster_2"));
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)
() -> service.validateAndUpdateRemoteCluster(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
new LinkedClusterConnectionConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)
)
);
assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
}
Expand Down Expand Up @@ -566,7 +575,10 @@ public void testDefaultPingSchedule() throws IOException {
assertTrue(hasRegisteredClusters(service));
service.validateAndUpdateRemoteCluster(
"cluster_1",
createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString()))
new LinkedClusterConnectionConfig(
"cluster_1",
createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString()))
)
);
assertTrue(hasRegisteredClusters(service));
assertTrue(isRemoteClusterRegistered(service, "cluster_1"));
Expand Down Expand Up @@ -677,7 +689,10 @@ public void testChangeSettings() throws Exception {
settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange);
}
settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build());
service.validateAndUpdateRemoteCluster(
"cluster_1",
new LinkedClusterConnectionConfig("cluster_1", settingsChange.build())
);
assertBusy(remoteClusterConnection::isClosed);

remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.transport.LinkedClusterConnectionConfig;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -545,18 +546,20 @@ private static class RemoteClusterResolver extends RemoteClusterAware {

private final CopyOnWriteArraySet<String> clusters;

// This would also need to change: either the LinkedClusterConnectionConfigListener can return a list of cluster aliases,
// or we use a different class that returns this info
private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) {
super(settings);
clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings));
listenForUpdates(clusterSettings);
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) {
clusters.add(clusterAlias);
protected void updateRemoteCluster(LinkedClusterConnectionConfig config) {
if (RemoteConnectionStrategy.isConnectionEnabled(config)) {
clusters.add(config.getClusterAlias());
} else {
clusters.remove(clusterAlias);
clusters.remove(config.getClusterAlias());
}
}

Expand Down