Skip to content

Commit 2575e50

Browse files
committed
POC linked cluster connection config
1 parent 892d88b commit 2575e50

File tree

7 files changed

+154
-13
lines changed

7 files changed

+154
-13
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.common.settings.Settings;
13+
14+
import java.util.List;
15+
16+
import static org.elasticsearch.transport.RemoteConnectionStrategy.REMOTE_CONNECTION_MODE;
17+
18+
public class LinkedClusterConnectionConfig {
19+
private final String clusterAlias;
20+
private final String proxyAddress;
21+
// we would not actually store settings in the final version
22+
private final Settings settings;
23+
24+
public LinkedClusterConnectionConfig(String clusterAlias, Settings settings) {
25+
this.clusterAlias = clusterAlias;
26+
this.proxyAddress = ProxyConnectionStrategy.PROXY_ADDRESS.get(settings);
27+
this.settings = settings;
28+
}
29+
30+
// this would be called by the CPS ProjectCustom based implementation
31+
public LinkedClusterConnectionConfig(String clusterAlias, String proxyAddress) {
32+
this.clusterAlias = clusterAlias;
33+
this.proxyAddress = proxyAddress;
34+
this.settings = null;
35+
}
36+
37+
public List<String> getSeeds() {
38+
return SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
39+
}
40+
41+
public String getProxyAddress() {
42+
return proxyAddress;
43+
}
44+
45+
public RemoteConnectionStrategy.ConnectionStrategy getConnectionMode() {
46+
return REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
47+
}
48+
49+
public Settings getSettings() {
50+
return settings;
51+
}
52+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.common.settings.ClusterSettings;
13+
import org.elasticsearch.common.settings.Setting;
14+
15+
import java.util.List;
16+
import java.util.function.BiConsumer;
17+
18+
public interface LinkedClusterConnectionConfigListener {
19+
20+
void listen(BiConsumer<String, LinkedClusterConnectionConfig> consumer);
21+
22+
class ClusterSettingsListener implements LinkedClusterConnectionConfigListener {
23+
private final ClusterSettings clusterSettings;
24+
25+
public ClusterSettingsListener(ClusterSettings clusterSettings) {
26+
this.clusterSettings = clusterSettings;
27+
}
28+
29+
@Override
30+
public void listen(BiConsumer<String, LinkedClusterConnectionConfig> consumer) {
31+
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
32+
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
33+
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
34+
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
35+
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
36+
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
37+
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
38+
ProxyConnectionStrategy.PROXY_ADDRESS,
39+
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
40+
ProxyConnectionStrategy.SERVER_NAME
41+
);
42+
clusterSettings.addAffixGroupUpdateConsumer(
43+
remoteClusterSettings,
44+
(alias, settings) -> consumer.accept(alias, new LinkedClusterConnectionConfig(alias, settings))
45+
);
46+
}
47+
}
48+
49+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,17 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
208208
return perClusterIndices;
209209
}
210210

211-
void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
211+
void validateAndUpdateRemoteCluster(String clusterAlias, LinkedClusterConnectionConfig config) {
212212
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
213213
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
214214
}
215-
updateRemoteCluster(clusterAlias, settings);
215+
updateRemoteCluster(clusterAlias, config);
216216
}
217217

218218
/**
219219
* Subclasses must implement this to receive information about updated cluster aliases.
220220
*/
221-
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);
221+
protected abstract void updateRemoteCluster(String clusterAlias, LinkedClusterConnectionConfig config);
222222

223223
/**
224224
* Registers this instance to listen to updates on the cluster settings.
@@ -235,7 +235,17 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
235235
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
236236
ProxyConnectionStrategy.SERVER_NAME
237237
);
238-
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
238+
clusterSettings.addAffixGroupUpdateConsumer(
239+
remoteClusterSettings,
240+
(clusterAlias, settings) -> validateAndUpdateRemoteCluster(
241+
clusterAlias,
242+
new LinkedClusterConnectionConfig(clusterAlias, settings)
243+
)
244+
);
245+
}
246+
247+
public void listenForUpdates(LinkedClusterConnectionConfigListener listener) {
248+
listener.listen(this::validateAndUpdateRemoteCluster);
239249
}
240250

241251
public static String buildRemoteIndexName(String clusterAlias, String indexName) {

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,17 @@ static Set<String> getRemoteClusters(Settings settings) {
189189
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
190190
}
191191

192+
public static boolean isConnectionEnabled(LinkedClusterConnectionConfig config) {
193+
ConnectionStrategy mode = config.getConnectionMode();
194+
if (mode.equals(ConnectionStrategy.SNIFF)) {
195+
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
196+
return seeds.isEmpty() == false;
197+
} else {
198+
String address = config.getProxyAddress();
199+
return Strings.isEmpty(address) == false;
200+
}
201+
}
202+
192203
public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
193204
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
194205
if (mode.equals(ConnectionStrategy.SNIFF)) {

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ public TransportService(
284284
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
285285
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
286286
if (remoteClusterClient) {
287-
remoteClusterService.listenForUpdates(clusterSettings);
287+
// we wouldn't actually instantiate LinkedClusterConnectionConfigListener here, instead it would be injected via SPI
288+
remoteClusterService.listenForUpdates(new LinkedClusterConnectionConfigListener.ClusterSettingsListener(clusterSettings));
288289
}
289290
clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold);
290291
}

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,10 @@ public void testIncrementallyAddClusters() throws IOException {
497497
// connect before returning.
498498
new Thread(() -> {
499499
try {
500-
service.validateAndUpdateRemoteCluster("cluster_1", cluster1Settings);
500+
service.validateAndUpdateRemoteCluster(
501+
"cluster_1",
502+
new LinkedClusterConnectionConfig("cluster_1", cluster1Settings)
503+
);
501504
clusterAdded.onResponse(null);
502505
} catch (Exception e) {
503506
clusterAdded.onFailure(e);
@@ -510,16 +513,22 @@ public void testIncrementallyAddClusters() throws IOException {
510513
"cluster_2",
511514
Collections.singletonList(cluster2Seed.getAddress().toString())
512515
);
513-
service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings);
516+
service.validateAndUpdateRemoteCluster("cluster_2", new LinkedClusterConnectionConfig("cluster_2", cluster2Settings));
514517
assertTrue(hasRegisteredClusters(service));
515518
assertTrue(isRemoteClusterRegistered(service, "cluster_1"));
516519
assertTrue(isRemoteClusterRegistered(service, "cluster_2"));
517520
Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList());
518-
service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled);
521+
service.validateAndUpdateRemoteCluster(
522+
"cluster_2",
523+
new LinkedClusterConnectionConfig("cluster_2", cluster2SettingsDisabled)
524+
);
519525
assertFalse(isRemoteClusterRegistered(service, "cluster_2"));
520526
IllegalArgumentException iae = expectThrows(
521527
IllegalArgumentException.class,
522-
() -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)
528+
() -> service.validateAndUpdateRemoteCluster(
529+
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
530+
new LinkedClusterConnectionConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)
531+
)
523532
);
524533
assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
525534
}
@@ -566,7 +575,10 @@ public void testDefaultPingSchedule() throws IOException {
566575
assertTrue(hasRegisteredClusters(service));
567576
service.validateAndUpdateRemoteCluster(
568577
"cluster_1",
569-
createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString()))
578+
new LinkedClusterConnectionConfig(
579+
"cluster_1",
580+
createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString()))
581+
)
570582
);
571583
assertTrue(hasRegisteredClusters(service));
572584
assertTrue(isRemoteClusterRegistered(service, "cluster_1"));
@@ -677,7 +689,10 @@ public void testChangeSettings() throws Exception {
677689
settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange);
678690
}
679691
settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
680-
service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build());
692+
service.validateAndUpdateRemoteCluster(
693+
"cluster_1",
694+
new LinkedClusterConnectionConfig("cluster_1", settingsChange.build())
695+
);
681696
assertBusy(remoteClusterConnection::isClosed);
682697

683698
remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.core.Tuple;
3232
import org.elasticsearch.index.Index;
3333
import org.elasticsearch.index.IndexNotFoundException;
34+
import org.elasticsearch.transport.LinkedClusterConnectionConfig;
3435
import org.elasticsearch.transport.NoSuchRemoteClusterException;
3536
import org.elasticsearch.transport.RemoteClusterAware;
3637
import org.elasticsearch.transport.RemoteConnectionStrategy;
@@ -545,15 +546,17 @@ private static class RemoteClusterResolver extends RemoteClusterAware {
545546

546547
private final CopyOnWriteArraySet<String> clusters;
547548

549+
// This would also need to change: either the LinkedClusterConnectionConfigListener can return a list of cluster aliases,
550+
// or we use a different class that returns this info
548551
private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) {
549552
super(settings);
550553
clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings));
551554
listenForUpdates(clusterSettings);
552555
}
553556

554557
@Override
555-
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
556-
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) {
558+
protected void updateRemoteCluster(String clusterAlias, LinkedClusterConnectionConfig config) {
559+
if (RemoteConnectionStrategy.isConnectionEnabled(config)) {
557560
clusters.add(clusterAlias);
558561
} else {
559562
clusters.remove(clusterAlias);

0 commit comments

Comments
 (0)