Skip to content

Commit 46caac4

Browse files
Split config into sniff and proxy types
1 parent 92adad2 commit 46caac4

File tree

15 files changed

+292
-139
lines changed

15 files changed

+292
-139
lines changed

server/src/main/java/org/elasticsearch/transport/LinkedProjectConfig.java

Lines changed: 156 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,126 @@
1111

1212
import org.elasticsearch.cluster.metadata.ProjectId;
1313
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.core.TimeValue;
1516

1617
import java.util.List;
18+
import java.util.Objects;
1719
import java.util.function.Predicate;
1820

1921
import static org.elasticsearch.transport.RemoteConnectionStrategy.ConnectionStrategy;
2022

21-
// FIXME: Refactor into a class heirarchy with subclasses for each connection strategy
22-
public record LinkedProjectConfig(
23-
ProjectId originProjectId,
24-
ProjectId linkedProjectId,
25-
String linkedProjectAlias,
26-
TimeValue transportConnectTimeout,
27-
Compression.Enabled connectionCompression,
28-
Compression.Scheme connectionCompressionScheme,
29-
TimeValue clusterPingSchedule,
30-
TimeValue initialConnectionTimeout,
31-
boolean skipUnavailable,
32-
ConnectionStrategy connectionStrategy,
33-
int maxNumConnections,
34-
String proxyAddress,
35-
String proxyServerName,
36-
Predicate<DiscoveryNode> sniffNodePredicate,
37-
List<String> sniffSeedNodes,
38-
int maxPendingConnectionListeners // only used in tests
39-
) {
40-
41-
public static Builder builder() {
23+
public interface LinkedProjectConfig {
24+
ProjectId originProjectId();
25+
26+
ProjectId linkedProjectId();
27+
28+
String linkedProjectAlias();
29+
30+
TimeValue transportConnectTimeout();
31+
32+
Compression.Enabled connectionCompression();
33+
34+
Compression.Scheme connectionCompressionScheme();
35+
36+
TimeValue clusterPingSchedule();
37+
38+
TimeValue initialConnectionTimeout();
39+
40+
boolean skipUnavailable();
41+
42+
int maxPendingConnectionListeners();
43+
44+
ConnectionStrategy connectionStrategy();
45+
46+
boolean isConnectionEnabled();
47+
48+
RemoteConnectionStrategy buildConnectionStrategy(TransportService transportService, RemoteConnectionManager connectionManager);
49+
50+
record ProxyConnectionStrategyConfig(
51+
ProjectId originProjectId,
52+
ProjectId linkedProjectId,
53+
String linkedProjectAlias,
54+
TimeValue transportConnectTimeout,
55+
Compression.Enabled connectionCompression,
56+
Compression.Scheme connectionCompressionScheme,
57+
TimeValue clusterPingSchedule,
58+
TimeValue initialConnectionTimeout,
59+
boolean skipUnavailable,
60+
int maxPendingConnectionListeners,
61+
int maxNumConnections,
62+
String proxyAddress,
63+
String serverName
64+
) implements LinkedProjectConfig {
65+
66+
@Override
67+
public ConnectionStrategy connectionStrategy() {
68+
return ConnectionStrategy.PROXY;
69+
}
70+
71+
@Override
72+
public boolean isConnectionEnabled() {
73+
return Strings.isEmpty(proxyAddress) == false;
74+
}
75+
76+
@Override
77+
public RemoteConnectionStrategy buildConnectionStrategy(
78+
TransportService transportService,
79+
RemoteConnectionManager connectionManager
80+
) {
81+
return new ProxyConnectionStrategy(this, transportService, connectionManager);
82+
}
83+
}
84+
85+
record SniffConnectionStrategyConfig(
86+
ProjectId originProjectId,
87+
ProjectId linkedProjectId,
88+
String linkedProjectAlias,
89+
TimeValue transportConnectTimeout,
90+
Compression.Enabled connectionCompression,
91+
Compression.Scheme connectionCompressionScheme,
92+
TimeValue clusterPingSchedule,
93+
TimeValue initialConnectionTimeout,
94+
boolean skipUnavailable,
95+
int maxPendingConnectionListeners,
96+
int maxNumConnections,
97+
Predicate<DiscoveryNode> nodePredicate,
98+
List<String> seedNodes,
99+
String proxyAddress
100+
) implements LinkedProjectConfig {
101+
102+
@Override
103+
public ConnectionStrategy connectionStrategy() {
104+
return ConnectionStrategy.SNIFF;
105+
}
106+
107+
@Override
108+
public boolean isConnectionEnabled() {
109+
return seedNodes.isEmpty() == false;
110+
}
111+
112+
@Override
113+
public RemoteConnectionStrategy buildConnectionStrategy(
114+
TransportService transportService,
115+
RemoteConnectionManager connectionManager
116+
) {
117+
return new SniffConnectionStrategy(this, transportService, connectionManager);
118+
}
119+
}
120+
121+
static Builder builder() {
42122
return new Builder();
43123
}
44124

45-
public static class Builder {
125+
static Builder buildForAlias(String linkedProjectAlias) {
126+
return new Builder().linkedProjectAlias(linkedProjectAlias);
127+
}
128+
129+
static Builder buildForLinkedProject(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) {
130+
return new Builder(originProjectId, linkedProjectId, linkedProjectAlias);
131+
}
132+
133+
class Builder {
46134
private ProjectId originProjectId;
47135
private ProjectId linkedProjectId;
48136
private String linkedProjectAlias;
@@ -62,13 +150,19 @@ public static class Builder {
62150

63151
private Builder() {}
64152

153+
private Builder(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) {
154+
originProjectId(originProjectId);
155+
linkedProjectId(linkedProjectId);
156+
linkedProjectAlias(linkedProjectAlias);
157+
}
158+
65159
public Builder originProjectId(ProjectId originProjectId) {
66-
this.originProjectId = originProjectId;
160+
this.originProjectId = Objects.requireNonNull(originProjectId);
67161
return this;
68162
}
69163

70164
public Builder linkedProjectId(ProjectId linkedProjectId) {
71-
this.linkedProjectId = linkedProjectId;
165+
this.linkedProjectId = Objects.requireNonNull(linkedProjectId);
72166
return this;
73167
}
74168

@@ -81,27 +175,27 @@ public Builder linkedProjectAlias(String linkedProjectAlias) {
81175
}
82176

83177
public Builder transportConnectTimeout(TimeValue transportConnectTimeout) {
84-
this.transportConnectTimeout = transportConnectTimeout;
178+
this.transportConnectTimeout = Objects.requireNonNull(transportConnectTimeout);
85179
return this;
86180
}
87181

88182
public Builder connectionCompression(Compression.Enabled connectionCompression) {
89-
this.connectionCompression = connectionCompression;
183+
this.connectionCompression = Objects.requireNonNull(connectionCompression);
90184
return this;
91185
}
92186

93187
public Builder connectionCompressionScheme(Compression.Scheme connectionCompressionScheme) {
94-
this.connectionCompressionScheme = connectionCompressionScheme;
188+
this.connectionCompressionScheme = Objects.requireNonNull(connectionCompressionScheme);
95189
return this;
96190
}
97191

98192
public Builder clusterPingSchedule(TimeValue clusterPingSchedule) {
99-
this.clusterPingSchedule = clusterPingSchedule;
193+
this.clusterPingSchedule = Objects.requireNonNull(clusterPingSchedule);
100194
return this;
101195
}
102196

103197
public Builder initialConnectionTimeout(TimeValue initialConnectionTimeout) {
104-
this.initialConnectionTimeout = initialConnectionTimeout;
198+
this.initialConnectionTimeout = Objects.requireNonNull(initialConnectionTimeout);
105199
return this;
106200
}
107201

@@ -111,7 +205,7 @@ public Builder skipUnavailable(boolean skipUnavailable) {
111205
}
112206

113207
public Builder connectionStrategy(ConnectionStrategy connectionStrategy) {
114-
this.connectionStrategy = connectionStrategy;
208+
this.connectionStrategy = Objects.requireNonNull(connectionStrategy);
115209
return this;
116210
}
117211

@@ -131,12 +225,12 @@ public Builder proxyServerName(String proxyServerName) {
131225
}
132226

133227
public Builder sniffNodePredicate(Predicate<DiscoveryNode> sniffNodePredicate) {
134-
this.sniffNodePredicate = sniffNodePredicate;
228+
this.sniffNodePredicate = Objects.requireNonNull(sniffNodePredicate);
135229
return this;
136230
}
137231

138232
public Builder sniffSeedNodes(List<String> sniffSeedNodes) {
139-
this.sniffSeedNodes = sniffSeedNodes;
233+
this.sniffSeedNodes = Objects.requireNonNull(sniffSeedNodes);
140234
return this;
141235
}
142236

@@ -146,7 +240,16 @@ public Builder maxPendingConnectionListeners(int maxPendingConnectionListeners)
146240
}
147241

148242
public LinkedProjectConfig build() {
149-
return new LinkedProjectConfig(
243+
assert connectionStrategy != null : "ConnectionStrategy must be set";
244+
return switch (connectionStrategy) {
245+
case PROXY -> buildProxyConnectionStrategyConfig();
246+
case SNIFF -> buildSniffConnectionStrategyConfig();
247+
};
248+
}
249+
250+
public ProxyConnectionStrategyConfig buildProxyConnectionStrategyConfig() {
251+
assert ConnectionStrategy.PROXY.equals(connectionStrategy) : "ConnectionStrategy must be PROXY";
252+
return new ProxyConnectionStrategyConfig(
150253
originProjectId,
151254
linkedProjectId,
152255
linkedProjectAlias,
@@ -156,13 +259,30 @@ public LinkedProjectConfig build() {
156259
clusterPingSchedule,
157260
initialConnectionTimeout,
158261
skipUnavailable,
159-
connectionStrategy,
262+
maxPendingConnectionListeners,
160263
maxNumConnections,
161264
proxyAddress,
162-
proxyServerName,
265+
proxyServerName
266+
);
267+
}
268+
269+
public SniffConnectionStrategyConfig buildSniffConnectionStrategyConfig() {
270+
assert ConnectionStrategy.SNIFF.equals(connectionStrategy) : "ConnectionStrategy must be SNIFF";
271+
return new SniffConnectionStrategyConfig(
272+
originProjectId,
273+
linkedProjectId,
274+
linkedProjectAlias,
275+
transportConnectTimeout,
276+
connectionCompression,
277+
connectionCompressionScheme,
278+
clusterPingSchedule,
279+
initialConnectionTimeout,
280+
skipUnavailable,
281+
maxPendingConnectionListeners,
282+
maxNumConnections,
163283
sniffNodePredicate,
164284
sniffSeedNodes,
165-
maxPendingConnectionListeners
285+
proxyAddress
166286
);
167287
}
168288
}

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.function.Supplier;
3939

4040
import static org.elasticsearch.core.Strings.format;
41+
import static org.elasticsearch.transport.LinkedProjectConfig.ProxyConnectionStrategyConfig;
4142

4243
public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
4344

@@ -52,20 +53,24 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
5253
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
5354
private final ConnectionManager.ConnectionValidator clusterNameValidator;
5455

55-
ProxyConnectionStrategy(LinkedProjectConfig config, TransportService transportService, RemoteConnectionManager connectionManager) {
56+
ProxyConnectionStrategy(
57+
ProxyConnectionStrategyConfig config,
58+
TransportService transportService,
59+
RemoteConnectionManager connectionManager
60+
) {
5661
this(config, () -> resolveAddress(config.proxyAddress()), transportService, connectionManager);
5762
}
5863

5964
ProxyConnectionStrategy(
60-
LinkedProjectConfig config,
65+
ProxyConnectionStrategyConfig config,
6166
Supplier<TransportAddress> address,
6267
TransportService transportService,
6368
RemoteConnectionManager connectionManager
6469
) {
6570
super(config, transportService, connectionManager);
6671
this.maxNumConnections = config.maxNumConnections();
6772
this.configuredAddress = config.proxyAddress();
68-
this.configuredServerName = config.proxyServerName();
73+
this.configuredServerName = config.serverName();
6974
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
7075
this.address = address;
7176
this.clusterNameValidator = (newConnection, actualProfile, listener) -> {
@@ -106,9 +111,11 @@ protected boolean shouldOpenMoreConnections() {
106111

107112
@Override
108113
protected boolean strategyMustBeRebuilt(LinkedProjectConfig config) {
109-
return config.maxNumConnections() != maxNumConnections
110-
|| configuredAddress.equals(config.proxyAddress()) == false
111-
|| Objects.equals(config.proxyServerName(), configuredServerName) == false;
114+
assert config instanceof ProxyConnectionStrategyConfig : "expected config to be of type " + ProxyConnectionStrategy.class;
115+
final var proxyConfig = (ProxyConnectionStrategyConfig) config;
116+
return proxyConfig.maxNumConnections() != maxNumConnections
117+
|| configuredAddress.equals(proxyConfig.proxyAddress()) == false
118+
|| Objects.equals(proxyConfig.serverName(), configuredServerName) == false;
112119
}
113120

114121
@Override

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public final class RemoteClusterConnection implements Closeable {
7777
credentialsManager,
7878
createConnectionManager(profile, transportService)
7979
);
80-
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(config, transportService, remoteConnectionManager);
80+
this.connectionStrategy = config.buildConnectionStrategy(transportService, remoteConnectionManager);
8181
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
8282
this.remoteConnectionManager.addListener(transportService);
8383
this.skipUnavailable = config.skipUnavailable();

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,7 @@ private synchronized void updateRemoteCluster(
435435
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
436436
}
437437
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
438-
final var linkedProjectConfig = RemoteClusterSettings.toConfigBuilder(clusterAlias, mergedSettings)
439-
.originProjectId(projectId)
440-
.linkedProjectAlias(clusterAlias)
441-
.build();
438+
final var linkedProjectConfig = RemoteClusterSettings.toConfig(projectId, clusterAlias, mergedSettings);
442439
updateRemoteCluster(linkedProjectConfig, forceRebuild, listener);
443440
}
444441

@@ -451,7 +448,7 @@ private synchronized void updateRemoteCluster(
451448
final var clusterAlias = config.linkedProjectAlias();
452449
final var connectionMap = getConnectionsMapForProject(projectId);
453450
RemoteClusterConnection remote = connectionMap.get(clusterAlias);
454-
if (RemoteConnectionStrategy.isConnectionEnabled(config) == false) {
451+
if (config.isConnectionEnabled() == false) {
455452
try {
456453
IOUtils.close(remote);
457454
} catch (IOException e) {
@@ -508,8 +505,8 @@ void initializeRemoteClusters() {
508505

509506
CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future);
510507
for (String clusterAlias : enabledClusters) {
511-
final var config = RemoteClusterSettings.toConfigBuilder(clusterAlias, settings).originProjectId(projectId).build();
512-
updateRemoteCluster(config, false, listener.map(ignored -> null));
508+
final var linkedProjectConfig = RemoteClusterSettings.toConfig(projectId, clusterAlias, settings);
509+
updateRemoteCluster(linkedProjectConfig, false, listener.map(ignored -> null));
513510
}
514511

515512
if (enabledClusters.isEmpty()) {

0 commit comments

Comments
 (0)