Skip to content

Commit 5383ebe

Browse files
Use LinkedProjectConfig in RemoteClusterService (#133266)
Moves all remaining settings and supporting methods into RemoteClusterSettings and introduces a configuration type LinkedProjectConfig with concrete record types that can be built from the settings. Refactors RemoteClusterService and the related classes to use the LinkedProjectConfig, with settings extraction at higher levels in RemoteClusterService and RemoteClusterAware and its subclasses. This is another step towards supporting multiple origin projects with linked project configuration built from cluster state ProjectCustom updates. Resolves: ES-12656, ES-12569
1 parent 4661d06 commit 5383ebe

File tree

26 files changed

+1214
-756
lines changed

26 files changed

+1214
-756
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,10 @@
132132
import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker;
133133
import org.elasticsearch.snapshots.SnapshotsService;
134134
import org.elasticsearch.threadpool.ThreadPool;
135-
import org.elasticsearch.transport.ProxyConnectionStrategy;
136135
import org.elasticsearch.transport.RemoteClusterPortSettings;
137136
import org.elasticsearch.transport.RemoteClusterSettings;
138-
import org.elasticsearch.transport.RemoteConnectionStrategy;
139-
import org.elasticsearch.transport.SniffConnectionStrategy;
137+
import org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings;
138+
import org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings;
140139
import org.elasticsearch.transport.TransportService;
141140
import org.elasticsearch.transport.TransportSettings;
142141
import org.elasticsearch.watcher.ResourceWatcherService;
@@ -367,19 +366,19 @@ public void apply(Settings value, Settings current, Settings previous) {
367366
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
368367
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
369368
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
370-
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
371369
RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
372370
RemoteClusterSettings.REMOTE_NODE_ATTRIBUTE,
373371
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
374372
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
375373
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESSION_SCHEME,
376-
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
377-
ProxyConnectionStrategy.PROXY_ADDRESS,
378-
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
379-
ProxyConnectionStrategy.SERVER_NAME,
380-
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
381-
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
382-
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
374+
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
375+
ProxyConnectionStrategySettings.PROXY_ADDRESS,
376+
ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
377+
ProxyConnectionStrategySettings.SERVER_NAME,
378+
SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
379+
SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
380+
SniffConnectionStrategySettings.REMOTE_CONNECTIONS_PER_CLUSTER,
381+
SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
383382
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
384383
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
385384
SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING,
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.core.TimeValue;
17+
18+
import java.util.List;
19+
import java.util.Objects;
20+
import java.util.function.Predicate;
21+
22+
import static org.elasticsearch.transport.RemoteConnectionStrategy.ConnectionStrategy;
23+
24+
/**
25+
* <p>Configuration for initializing {@link RemoteClusterConnection}s to linked projects.</p>
26+
*
27+
* <p>The {@link ProxyLinkedProjectConfigBuilder} and {@link SniffLinkedProjectConfigBuilder} classes can be used to build concrete
28+
* implementations of {@link LinkedProjectConfig}.</p>
29+
*
30+
* <p>The {@link RemoteClusterSettings#toConfig(String, Settings)} and
31+
* {@link RemoteClusterSettings#toConfig(ProjectId, ProjectId, String, Settings)} methods
32+
* can be used to read {@link RemoteClusterSettings} to build a concrete {@link LinkedProjectConfig} from {@link Settings}.</p>
33+
*/
34+
public sealed interface LinkedProjectConfig {
35+
ProjectId originProjectId();
36+
37+
ProjectId linkedProjectId();
38+
39+
String linkedProjectAlias();
40+
41+
TimeValue transportConnectTimeout();
42+
43+
Compression.Enabled connectionCompression();
44+
45+
Compression.Scheme connectionCompressionScheme();
46+
47+
TimeValue clusterPingSchedule();
48+
49+
TimeValue initialConnectionTimeout();
50+
51+
boolean skipUnavailable();
52+
53+
int maxPendingConnectionListeners();
54+
55+
ConnectionStrategy connectionStrategy();
56+
57+
int maxNumConnections();
58+
59+
String proxyAddress();
60+
61+
boolean isConnectionEnabled();
62+
63+
RemoteConnectionStrategy buildRemoteConnectionStrategy(TransportService transportService, RemoteConnectionManager connectionManager);
64+
65+
/**
66+
* Configuration for initializing {@link RemoteClusterConnection}s to linked projects using the {@link ProxyConnectionStrategy}.
67+
*/
68+
record ProxyLinkedProjectConfig(
69+
ProjectId originProjectId,
70+
ProjectId linkedProjectId,
71+
String linkedProjectAlias,
72+
TimeValue transportConnectTimeout,
73+
Compression.Enabled connectionCompression,
74+
Compression.Scheme connectionCompressionScheme,
75+
TimeValue clusterPingSchedule,
76+
TimeValue initialConnectionTimeout,
77+
boolean skipUnavailable,
78+
int maxPendingConnectionListeners,
79+
int maxNumConnections,
80+
String proxyAddress,
81+
String serverName
82+
) implements LinkedProjectConfig {
83+
84+
@Override
85+
public ConnectionStrategy connectionStrategy() {
86+
return ConnectionStrategy.PROXY;
87+
}
88+
89+
@Override
90+
public boolean isConnectionEnabled() {
91+
return Strings.isEmpty(proxyAddress) == false;
92+
}
93+
94+
@Override
95+
public RemoteConnectionStrategy buildRemoteConnectionStrategy(
96+
TransportService transportService,
97+
RemoteConnectionManager connectionManager
98+
) {
99+
return new ProxyConnectionStrategy(this, transportService, connectionManager);
100+
}
101+
}
102+
103+
/**
104+
* Configuration for initializing {@link RemoteClusterConnection}s to linked projects using the {@link SniffConnectionStrategy}.
105+
*/
106+
record SniffLinkedProjectConfig(
107+
ProjectId originProjectId,
108+
ProjectId linkedProjectId,
109+
String linkedProjectAlias,
110+
TimeValue transportConnectTimeout,
111+
Compression.Enabled connectionCompression,
112+
Compression.Scheme connectionCompressionScheme,
113+
TimeValue clusterPingSchedule,
114+
TimeValue initialConnectionTimeout,
115+
boolean skipUnavailable,
116+
int maxPendingConnectionListeners,
117+
int maxNumConnections,
118+
Predicate<DiscoveryNode> nodePredicate,
119+
List<String> seedNodes,
120+
String proxyAddress
121+
) implements LinkedProjectConfig {
122+
123+
@Override
124+
public ConnectionStrategy connectionStrategy() {
125+
return ConnectionStrategy.SNIFF;
126+
}
127+
128+
@Override
129+
public boolean isConnectionEnabled() {
130+
return seedNodes.isEmpty() == false;
131+
}
132+
133+
@Override
134+
public RemoteConnectionStrategy buildRemoteConnectionStrategy(
135+
TransportService transportService,
136+
RemoteConnectionManager connectionManager
137+
) {
138+
return new SniffConnectionStrategy(this, transportService, connectionManager);
139+
}
140+
}
141+
142+
abstract class Builder<B extends Builder<B>> {
143+
protected final ProjectId originProjectId;
144+
protected final ProjectId linkedProjectId;
145+
protected final String linkedProjectAlias;
146+
protected final ConnectionStrategy connectionStrategy;
147+
private final B concreteBuilder;
148+
protected TimeValue transportConnectTimeout = TransportSettings.DEFAULT_CONNECT_TIMEOUT;
149+
protected Compression.Enabled connectionCompression = TransportSettings.DEFAULT_TRANSPORT_COMPRESS;
150+
protected Compression.Scheme connectionCompressionScheme = TransportSettings.DEFAULT_TRANSPORT_COMPRESSION_SCHEME;
151+
protected TimeValue clusterPingSchedule = TransportSettings.DEFAULT_PING_SCHEDULE;
152+
protected TimeValue initialConnectionTimeout = RemoteClusterSettings.DEFAULT_INITIAL_CONNECTION_TIMEOUT;
153+
protected boolean skipUnavailable = RemoteClusterSettings.DEFAULT_SKIP_UNAVAILABLE;
154+
protected String proxyAddress = "";
155+
protected int maxNumConnections;
156+
protected int maxPendingConnectionListeners = RemoteClusterSettings.DEFAULT_MAX_PENDING_CONNECTION_LISTENERS;
157+
158+
private Builder(
159+
ProjectId originProjectId,
160+
ProjectId linkedProjectId,
161+
String linkedProjectAlias,
162+
ConnectionStrategy connectionStrategy
163+
) {
164+
this.originProjectId = Objects.requireNonNull(originProjectId);
165+
this.linkedProjectId = Objects.requireNonNull(linkedProjectId);
166+
this.linkedProjectAlias = requireNonEmpty(linkedProjectAlias, "linkedProjectAlias");
167+
this.connectionStrategy = Objects.requireNonNull(connectionStrategy);
168+
this.concreteBuilder = self();
169+
this.maxNumConnections = switch (connectionStrategy) {
170+
case PROXY -> RemoteClusterSettings.ProxyConnectionStrategySettings.DEFAULT_REMOTE_SOCKET_CONNECTIONS;
171+
case SNIFF -> RemoteClusterSettings.SniffConnectionStrategySettings.DEFAULT_REMOTE_CONNECTIONS_PER_CLUSTER;
172+
};
173+
}
174+
175+
public B transportConnectTimeout(TimeValue transportConnectTimeout) {
176+
this.transportConnectTimeout = Objects.requireNonNull(transportConnectTimeout);
177+
return concreteBuilder;
178+
}
179+
180+
public B connectionCompression(Compression.Enabled connectionCompression) {
181+
this.connectionCompression = Objects.requireNonNull(connectionCompression);
182+
return concreteBuilder;
183+
}
184+
185+
public B connectionCompressionScheme(Compression.Scheme connectionCompressionScheme) {
186+
this.connectionCompressionScheme = Objects.requireNonNull(connectionCompressionScheme);
187+
return concreteBuilder;
188+
}
189+
190+
public B clusterPingSchedule(TimeValue clusterPingSchedule) {
191+
this.clusterPingSchedule = Objects.requireNonNull(clusterPingSchedule);
192+
return concreteBuilder;
193+
}
194+
195+
public B initialConnectionTimeout(TimeValue initialConnectionTimeout) {
196+
this.initialConnectionTimeout = Objects.requireNonNull(initialConnectionTimeout);
197+
return concreteBuilder;
198+
}
199+
200+
public B skipUnavailable(boolean skipUnavailable) {
201+
this.skipUnavailable = skipUnavailable;
202+
return concreteBuilder;
203+
}
204+
205+
public B proxyAddress(String proxyAddress) {
206+
// TODO: Eliminate leniency here allowing an empty proxy address, ES-12737.
207+
if (Strings.hasLength(proxyAddress)) {
208+
RemoteConnectionStrategy.parsePort(proxyAddress);
209+
}
210+
this.proxyAddress = proxyAddress;
211+
return concreteBuilder;
212+
}
213+
214+
public B maxNumConnections(int maxNumConnections) {
215+
this.maxNumConnections = requireGreaterThanZero(maxNumConnections, "maxNumConnections");
216+
return concreteBuilder;
217+
}
218+
219+
public B maxPendingConnectionListeners(int maxPendingConnectionListeners) {
220+
this.maxPendingConnectionListeners = requireGreaterThanZero(maxPendingConnectionListeners, "maxPendingConnectionListeners");
221+
return concreteBuilder;
222+
}
223+
224+
public abstract LinkedProjectConfig build();
225+
226+
protected abstract B self();
227+
228+
protected static int requireGreaterThanZero(int value, String name) {
229+
if (value <= 0) {
230+
throw new IllegalArgumentException("[" + name + "] must be greater than 0");
231+
}
232+
return value;
233+
}
234+
235+
protected static String requireNonEmpty(String value, String name) {
236+
if (Objects.requireNonNull(value).isBlank()) {
237+
throw new IllegalArgumentException("[" + name + "] cannot be empty");
238+
}
239+
return value;
240+
}
241+
}
242+
243+
class ProxyLinkedProjectConfigBuilder extends Builder<ProxyLinkedProjectConfigBuilder> {
244+
private String serverName = "";
245+
246+
public ProxyLinkedProjectConfigBuilder(String linkedProjectAlias) {
247+
super(ProjectId.DEFAULT, ProjectId.DEFAULT, linkedProjectAlias, ConnectionStrategy.PROXY);
248+
}
249+
250+
public ProxyLinkedProjectConfigBuilder(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) {
251+
super(originProjectId, linkedProjectId, linkedProjectAlias, ConnectionStrategy.PROXY);
252+
}
253+
254+
public ProxyLinkedProjectConfigBuilder serverName(String serverName) {
255+
this.serverName = serverName;
256+
return this;
257+
}
258+
259+
@Override
260+
public ProxyLinkedProjectConfig build() {
261+
return new ProxyLinkedProjectConfig(
262+
originProjectId,
263+
linkedProjectId,
264+
linkedProjectAlias,
265+
transportConnectTimeout,
266+
connectionCompression,
267+
connectionCompressionScheme,
268+
clusterPingSchedule,
269+
initialConnectionTimeout,
270+
skipUnavailable,
271+
maxPendingConnectionListeners,
272+
maxNumConnections,
273+
proxyAddress,
274+
serverName
275+
);
276+
}
277+
278+
@Override
279+
protected ProxyLinkedProjectConfigBuilder self() {
280+
return this;
281+
}
282+
}
283+
284+
class SniffLinkedProjectConfigBuilder extends Builder<SniffLinkedProjectConfigBuilder> {
285+
private Predicate<DiscoveryNode> nodePredicate = RemoteClusterSettings.SniffConnectionStrategySettings.DEFAULT_NODE_PREDICATE;
286+
private List<String> seedNodes = RemoteClusterSettings.SniffConnectionStrategySettings.DEFAULT_SEED_NODES;
287+
288+
public SniffLinkedProjectConfigBuilder(String linkedProjectAlias) {
289+
super(ProjectId.DEFAULT, ProjectId.DEFAULT, linkedProjectAlias, ConnectionStrategy.SNIFF);
290+
}
291+
292+
public SniffLinkedProjectConfigBuilder(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) {
293+
super(originProjectId, linkedProjectId, linkedProjectAlias, ConnectionStrategy.SNIFF);
294+
}
295+
296+
public SniffLinkedProjectConfigBuilder nodePredicate(Predicate<DiscoveryNode> nodePredicate) {
297+
this.nodePredicate = Objects.requireNonNull(nodePredicate);
298+
return this;
299+
}
300+
301+
public SniffLinkedProjectConfigBuilder seedNodes(List<String> seedNodes) {
302+
// TODO: Eliminate leniency here allowing an empty set of seed nodes, ES-12737.
303+
Objects.requireNonNull(seedNodes).forEach(RemoteConnectionStrategy::parsePort);
304+
this.seedNodes = seedNodes;
305+
return this;
306+
}
307+
308+
@Override
309+
public SniffLinkedProjectConfig build() {
310+
return new SniffLinkedProjectConfig(
311+
originProjectId,
312+
linkedProjectId,
313+
linkedProjectAlias,
314+
transportConnectTimeout,
315+
connectionCompression,
316+
connectionCompressionScheme,
317+
clusterPingSchedule,
318+
initialConnectionTimeout,
319+
skipUnavailable,
320+
maxPendingConnectionListeners,
321+
maxNumConnections,
322+
nodePredicate,
323+
seedNodes,
324+
proxyAddress
325+
);
326+
}
327+
328+
@Override
329+
protected SniffLinkedProjectConfigBuilder self() {
330+
return this;
331+
}
332+
}
333+
}

0 commit comments

Comments
 (0)