Skip to content

Commit 7cd7ae3

Browse files
Use LinkedProjectConfig record in RemoteClusterService
Moves all remaining settings and supporting methods into RemoteClusterSettings and introduces a configuration record type LinkedProjectConfig that can be built from the settings. Refactors RemoteClusterService and the related classes to use the LinkedProjectConfig, with settings extraction at higher levels in RemoteClusterService and RemoteClusterAware and its subclasses. This is another step towards supporting multiple origin projects with linked project configuration built from cluster state ProjectCustom updates. Resolves: ES-12656, ES-12569
1 parent 00fe9f5 commit 7cd7ae3

File tree

25 files changed

+893
-750
lines changed

25 files changed

+893
-750
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,10 @@
132132
import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker;
133133
import org.elasticsearch.snapshots.SnapshotsService;
134134
import org.elasticsearch.threadpool.ThreadPool;
135-
import org.elasticsearch.transport.ProxyConnectionStrategy;
136135
import org.elasticsearch.transport.RemoteClusterPortSettings;
137136
import org.elasticsearch.transport.RemoteClusterSettings;
138-
import org.elasticsearch.transport.RemoteConnectionStrategy;
139-
import org.elasticsearch.transport.SniffConnectionStrategy;
137+
import org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings;
138+
import org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings;
140139
import org.elasticsearch.transport.TransportService;
141140
import org.elasticsearch.transport.TransportSettings;
142141
import org.elasticsearch.watcher.ResourceWatcherService;
@@ -367,19 +366,19 @@ public void apply(Settings value, Settings current, Settings previous) {
367366
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
368367
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
369368
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
370-
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
371369
RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
372370
RemoteClusterSettings.REMOTE_NODE_ATTRIBUTE,
373371
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
374372
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
375373
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESSION_SCHEME,
376-
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
377-
ProxyConnectionStrategy.PROXY_ADDRESS,
378-
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
379-
ProxyConnectionStrategy.SERVER_NAME,
380-
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
381-
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
382-
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
374+
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
375+
ProxyConnectionStrategySettings.PROXY_ADDRESS,
376+
ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
377+
ProxyConnectionStrategySettings.SERVER_NAME,
378+
SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
379+
SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
380+
SniffConnectionStrategySettings.REMOTE_CONNECTIONS_PER_CLUSTER,
381+
SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
383382
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
384383
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
385384
SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING,
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.core.TimeValue;
15+
16+
import java.util.List;
17+
import java.util.function.Predicate;
18+
19+
import static org.elasticsearch.transport.RemoteConnectionStrategy.ConnectionStrategy;
20+
21+
public record LinkedProjectConfig(
22+
ProjectId originProjectId,
23+
ProjectId linkedProjectId,
24+
String linkedProjectAlias,
25+
TimeValue transportConnectTimeout,
26+
Compression.Enabled connectionCompression,
27+
Compression.Scheme connectionCompressionScheme,
28+
TimeValue clusterPingSchedule,
29+
TimeValue initialConnectionTimeout,
30+
boolean skipUnavailable,
31+
ConnectionStrategy connectionStrategy,
32+
int maxNumConnections,
33+
String proxyAddress,
34+
String proxyServerName,
35+
Predicate<DiscoveryNode> sniffNodePredicate,
36+
List<String> sniffSeedNodes,
37+
int maxPendingConnectionListeners // only used in tests
38+
) {
39+
40+
public static Builder builder() {
41+
return new Builder();
42+
}
43+
44+
public static class Builder {
45+
private ProjectId originProjectId;
46+
private ProjectId linkedProjectId;
47+
private String linkedProjectAlias;
48+
private TimeValue transportConnectTimeout;
49+
private Compression.Enabled connectionCompression;
50+
private Compression.Scheme connectionCompressionScheme;
51+
private TimeValue clusterPingSchedule;
52+
private TimeValue initialConnectionTimeout;
53+
private boolean skipUnavailable;
54+
private ConnectionStrategy connectionStrategy;
55+
private int maxNumConnections;
56+
private String proxyAddress;
57+
private String proxyServerName;
58+
private Predicate<DiscoveryNode> sniffNodePredicate;
59+
private List<String> sniffSeedNodes;
60+
private int maxPendingConnectionListeners = 1000;
61+
62+
private Builder() {}
63+
64+
public Builder originProjectId(ProjectId originProjectId) {
65+
this.originProjectId = originProjectId;
66+
return this;
67+
}
68+
69+
public Builder linkedProjectId(ProjectId linkedProjectId) {
70+
this.linkedProjectId = linkedProjectId;
71+
return this;
72+
}
73+
74+
public Builder linkedProjectAlias(String linkedProjectAlias) {
75+
this.linkedProjectAlias = linkedProjectAlias;
76+
return this;
77+
}
78+
79+
public Builder transportConnectTimeout(TimeValue transportConnectTimeout) {
80+
this.transportConnectTimeout = transportConnectTimeout;
81+
return this;
82+
}
83+
84+
public Builder connectionCompression(Compression.Enabled connectionCompression) {
85+
this.connectionCompression = connectionCompression;
86+
return this;
87+
}
88+
89+
public Builder connectionCompressionScheme(Compression.Scheme connectionCompressionScheme) {
90+
this.connectionCompressionScheme = connectionCompressionScheme;
91+
return this;
92+
}
93+
94+
public Builder clusterPingSchedule(TimeValue clusterPingSchedule) {
95+
this.clusterPingSchedule = clusterPingSchedule;
96+
return this;
97+
}
98+
99+
public Builder initialConnectionTimeout(TimeValue initialConnectionTimeout) {
100+
this.initialConnectionTimeout = initialConnectionTimeout;
101+
return this;
102+
}
103+
104+
public Builder skipUnavailable(boolean skipUnavailable) {
105+
this.skipUnavailable = skipUnavailable;
106+
return this;
107+
}
108+
109+
public Builder connectionStrategy(ConnectionStrategy connectionStrategy) {
110+
this.connectionStrategy = connectionStrategy;
111+
return this;
112+
}
113+
114+
public Builder maxNumConnections(int maxNumConnections) {
115+
this.maxNumConnections = maxNumConnections;
116+
return this;
117+
}
118+
119+
public Builder proxyAddress(String proxyAddress) {
120+
this.proxyAddress = proxyAddress;
121+
return this;
122+
}
123+
124+
public Builder proxyServerName(String proxyServerName) {
125+
this.proxyServerName = proxyServerName;
126+
return this;
127+
}
128+
129+
public Builder sniffNodePredicate(Predicate<DiscoveryNode> sniffNodePredicate) {
130+
this.sniffNodePredicate = sniffNodePredicate;
131+
return this;
132+
}
133+
134+
public Builder sniffSeedNodes(List<String> sniffSeedNodes) {
135+
this.sniffSeedNodes = sniffSeedNodes;
136+
return this;
137+
}
138+
139+
public Builder maxPendingConnectionListeners(int maxPendingConnectionListeners) {
140+
this.maxPendingConnectionListeners = maxPendingConnectionListeners;
141+
return this;
142+
}
143+
144+
public LinkedProjectConfig build() {
145+
return new LinkedProjectConfig(
146+
originProjectId,
147+
linkedProjectId,
148+
linkedProjectAlias,
149+
transportConnectTimeout,
150+
connectionCompression,
151+
connectionCompressionScheme,
152+
clusterPingSchedule,
153+
initialConnectionTimeout,
154+
skipUnavailable,
155+
connectionStrategy,
156+
maxNumConnections,
157+
proxyAddress,
158+
proxyServerName,
159+
sniffNodePredicate,
160+
sniffSeedNodes,
161+
maxPendingConnectionListeners
162+
);
163+
}
164+
}
165+
}

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 13 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import org.elasticsearch.common.io.stream.StreamInput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.common.io.stream.Writeable;
22-
import org.elasticsearch.common.settings.Setting;
23-
import org.elasticsearch.common.settings.Settings;
2422
import org.elasticsearch.common.transport.TransportAddress;
2523
import org.elasticsearch.common.util.concurrent.CountDown;
2624
import org.elasticsearch.core.Predicates;
@@ -38,56 +36,11 @@
3836
import java.util.concurrent.atomic.AtomicInteger;
3937
import java.util.concurrent.atomic.AtomicReference;
4038
import java.util.function.Supplier;
41-
import java.util.stream.Stream;
4239

43-
import static org.elasticsearch.common.settings.Setting.intSetting;
4440
import static org.elasticsearch.core.Strings.format;
4541

4642
public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
4743

48-
/**
49-
* The remote address for the proxy. The connections will be opened to the configured address.
50-
*/
51-
public static final Setting.AffixSetting<String> PROXY_ADDRESS = Setting.affixKeySetting(
52-
"cluster.remote.",
53-
"proxy_address",
54-
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
55-
if (Strings.hasLength(s)) {
56-
parsePort(s);
57-
}
58-
}), Setting.Property.Dynamic, Setting.Property.NodeScope)
59-
);
60-
61-
/**
62-
* The maximum number of socket connections that will be established to a remote cluster. The default is 18.
63-
*/
64-
public static final Setting.AffixSetting<Integer> REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting(
65-
"cluster.remote.",
66-
"proxy_socket_connections",
67-
(ns, key) -> intSetting(
68-
key,
69-
18,
70-
1,
71-
new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
72-
Setting.Property.Dynamic,
73-
Setting.Property.NodeScope
74-
)
75-
);
76-
77-
/**
78-
* A configurable server_name attribute
79-
*/
80-
public static final Setting.AffixSetting<String> SERVER_NAME = Setting.affixKeySetting(
81-
"cluster.remote.",
82-
"server_name",
83-
(ns, key) -> Setting.simpleString(
84-
key,
85-
new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
86-
Setting.Property.Dynamic,
87-
Setting.Property.NodeScope
88-
)
89-
);
90-
9144
static final int CHANNELS_PER_CONNECTION = 1;
9245

9346
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
@@ -99,78 +52,20 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
9952
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
10053
private final ConnectionManager.ConnectionValidator clusterNameValidator;
10154

102-
ProxyConnectionStrategy(
103-
String clusterAlias,
104-
TransportService transportService,
105-
RemoteConnectionManager connectionManager,
106-
Settings settings
107-
) {
108-
this(
109-
clusterAlias,
110-
transportService,
111-
connectionManager,
112-
settings,
113-
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
114-
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
115-
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)
116-
);
117-
}
118-
119-
ProxyConnectionStrategy(
120-
String clusterAlias,
121-
TransportService transportService,
122-
RemoteConnectionManager connectionManager,
123-
Settings settings,
124-
int maxNumConnections,
125-
String configuredAddress
126-
) {
127-
this(
128-
clusterAlias,
129-
transportService,
130-
connectionManager,
131-
settings,
132-
maxNumConnections,
133-
configuredAddress,
134-
() -> resolveAddress(configuredAddress),
135-
null
136-
);
137-
}
138-
139-
ProxyConnectionStrategy(
140-
String clusterAlias,
141-
TransportService transportService,
142-
RemoteConnectionManager connectionManager,
143-
Settings settings,
144-
int maxNumConnections,
145-
String configuredAddress,
146-
String configuredServerName
147-
) {
148-
this(
149-
clusterAlias,
150-
transportService,
151-
connectionManager,
152-
settings,
153-
maxNumConnections,
154-
configuredAddress,
155-
() -> resolveAddress(configuredAddress),
156-
configuredServerName
157-
);
55+
ProxyConnectionStrategy(LinkedProjectConfig config, TransportService transportService, RemoteConnectionManager connectionManager) {
56+
this(config, () -> resolveAddress(config.proxyAddress()), transportService, connectionManager);
15857
}
15958

16059
ProxyConnectionStrategy(
161-
String clusterAlias,
162-
TransportService transportService,
163-
RemoteConnectionManager connectionManager,
164-
Settings settings,
165-
int maxNumConnections,
166-
String configuredAddress,
60+
LinkedProjectConfig config,
16761
Supplier<TransportAddress> address,
168-
String configuredServerName
62+
TransportService transportService,
63+
RemoteConnectionManager connectionManager
16964
) {
170-
super(clusterAlias, transportService, connectionManager, settings);
171-
this.maxNumConnections = maxNumConnections;
172-
this.configuredAddress = configuredAddress;
173-
this.configuredServerName = configuredServerName;
65+
super(config, transportService, connectionManager);
66+
this.maxNumConnections = config.maxNumConnections();
67+
this.configuredAddress = config.proxyAddress();
68+
this.configuredServerName = config.proxyServerName();
17469
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
17570
this.address = address;
17671
this.clusterNameValidator = (newConnection, actualProfile, listener) -> {
@@ -200,10 +95,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
20095
};
20196
}
20297

203-
static Stream<Setting.AffixSetting<?>> enablementSettings() {
204-
return Stream.of(ProxyConnectionStrategy.PROXY_ADDRESS);
205-
}
206-
20798
static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
20899
return ProxyModeInfo::new;
209100
}
@@ -214,13 +105,10 @@ protected boolean shouldOpenMoreConnections() {
214105
}
215106

216107
@Override
217-
protected boolean strategyMustBeRebuilt(Settings newSettings) {
218-
String address = PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
219-
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
220-
String serverName = SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
221-
return numOfSockets != maxNumConnections
222-
|| configuredAddress.equals(address) == false
223-
|| Objects.equals(serverName, configuredServerName) == false;
108+
protected boolean strategyMustBeRebuilt(LinkedProjectConfig config) {
109+
return config.maxNumConnections() != maxNumConnections
110+
|| configuredAddress.equals(config.proxyAddress()) == false
111+
|| Objects.equals(config.proxyServerName(), configuredServerName) == false;
224112
}
225113

226114
@Override

0 commit comments

Comments
 (0)