-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Use LinkedProjectConfig in RemoteClusterService
#133266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
JeremyDahlgren
merged 32 commits into
elastic:main
from
JeremyDahlgren:es-12656-linked-proj-cfg
Aug 27, 2025
Merged
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
7cd7ae3
Use LinkedProjectConfig record in RemoteClusterService
JeremyDahlgren 26c177a
[CI] Auto commit changes from spotless
76f6a09
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren 9d1e8ae
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren f720699
Fix merged settings, check linked project alias
JeremyDahlgren a38fbfe
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren 62b42a1
[CI] Auto commit changes from spotless
92adad2
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren 46caac4
Split config into sniff and proxy types
JeremyDahlgren e287fc8
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren 7c23847
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren c2d3b15
Add more validations and tests
JeremyDahlgren aa17734
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren 7db5da7
[CI] Auto commit changes from spotless
3bb6cc0
Rename buildConnectionStrategy to buildRemoteConnectionStrategy
JeremyDahlgren 607a247
LinkedProjectConfig can be sealed
JeremyDahlgren 73dda5c
Do not use builder in RemoteConnectionEnabled.isConnectionEnabled()
JeremyDahlgren 98b1fb2
Rename LinkedProjectConfig Sniff and Proxy record types
JeremyDahlgren 5666b68
Restore package-private access level for ConnectionStrategy enum
JeremyDahlgren db1a1af
Use -randomNonNegativeInt() in assertChecksGreaterThanZero()
JeremyDahlgren 9cdedbd
Reduce visibility of some methods to private
JeremyDahlgren 21a3685
add maxNumConnections and proxyAddress to LinkedProjectConfig interface
JeremyDahlgren 0725cb3
Reduce changes in RemoteConnectionStrategyTests
JeremyDahlgren fb0e431
Refactor LinkedProjectConfig.Builder into concrete subclasses
JeremyDahlgren a545f12
Refactor default values
JeremyDahlgren 7c079ad
link ticket for eliminating leniency in builders
JeremyDahlgren 18d2917
[CI] Auto commit changes from spotless
939cde6
Fix failing TimeValue equalTo()
JeremyDahlgren d1fc308
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren dd81943
Remove unnecessary builder base class param
JeremyDahlgren bbd8aef
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren c513311
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
331 changes: 331 additions & 0 deletions
331
server/src/main/java/org/elasticsearch/transport/LinkedProjectConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,331 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.transport; | ||
|
|
||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.TimeValue; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.function.Predicate; | ||
|
|
||
| import static org.elasticsearch.transport.RemoteConnectionStrategy.ConnectionStrategy; | ||
|
|
||
| /** | ||
| * <p>Configuration for initializing {@link RemoteClusterConnection}s to linked projects.</p> | ||
| * | ||
| * <p>The {@link ProxyLinkedProjectConfigBuilder} and {@link SniffLinkedProjectConfigBuilder} classes can be used to build concrete | ||
| * implementations of {@link LinkedProjectConfig}.</p> | ||
| * | ||
| * <p>The {@link RemoteClusterSettings#toConfig(String, Settings)} and | ||
| * {@link RemoteClusterSettings#toConfig(ProjectId, ProjectId, String, Settings)} methods | ||
| * can be used to read {@link RemoteClusterSettings} to build a concrete {@link LinkedProjectConfig} from {@link Settings}.</p> | ||
| */ | ||
| public sealed interface LinkedProjectConfig { | ||
| ProjectId originProjectId(); | ||
|
|
||
| ProjectId linkedProjectId(); | ||
|
|
||
| String linkedProjectAlias(); | ||
|
|
||
| TimeValue transportConnectTimeout(); | ||
|
|
||
| Compression.Enabled connectionCompression(); | ||
|
|
||
| Compression.Scheme connectionCompressionScheme(); | ||
|
|
||
| TimeValue clusterPingSchedule(); | ||
|
|
||
| TimeValue initialConnectionTimeout(); | ||
|
|
||
| boolean skipUnavailable(); | ||
|
|
||
| int maxPendingConnectionListeners(); | ||
|
|
||
| ConnectionStrategy connectionStrategy(); | ||
|
|
||
| int maxNumConnections(); | ||
|
|
||
| String proxyAddress(); | ||
|
|
||
| boolean isConnectionEnabled(); | ||
|
|
||
| RemoteConnectionStrategy buildRemoteConnectionStrategy(TransportService transportService, RemoteConnectionManager connectionManager); | ||
|
|
||
| /** | ||
| * Configuration for initializing {@link RemoteClusterConnection}s to linked projects using the {@link ProxyConnectionStrategy}. | ||
| */ | ||
| record ProxyLinkedProjectConfig( | ||
| ProjectId originProjectId, | ||
| ProjectId linkedProjectId, | ||
| String linkedProjectAlias, | ||
| TimeValue transportConnectTimeout, | ||
| Compression.Enabled connectionCompression, | ||
| Compression.Scheme connectionCompressionScheme, | ||
| TimeValue clusterPingSchedule, | ||
| TimeValue initialConnectionTimeout, | ||
| boolean skipUnavailable, | ||
| int maxPendingConnectionListeners, | ||
| int maxNumConnections, | ||
| String proxyAddress, | ||
| String serverName | ||
| ) implements LinkedProjectConfig { | ||
|
|
||
| @Override | ||
| public ConnectionStrategy connectionStrategy() { | ||
| return ConnectionStrategy.PROXY; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isConnectionEnabled() { | ||
| return Strings.isEmpty(proxyAddress) == false; | ||
| } | ||
|
|
||
| @Override | ||
| public RemoteConnectionStrategy buildRemoteConnectionStrategy( | ||
| TransportService transportService, | ||
| RemoteConnectionManager connectionManager | ||
| ) { | ||
| return new ProxyConnectionStrategy(this, transportService, connectionManager); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Configuration for initializing {@link RemoteClusterConnection}s to linked projects using the {@link SniffConnectionStrategy}. | ||
| */ | ||
| record SniffLinkedProjectConfig( | ||
| ProjectId originProjectId, | ||
| ProjectId linkedProjectId, | ||
| String linkedProjectAlias, | ||
| TimeValue transportConnectTimeout, | ||
| Compression.Enabled connectionCompression, | ||
| Compression.Scheme connectionCompressionScheme, | ||
| TimeValue clusterPingSchedule, | ||
| TimeValue initialConnectionTimeout, | ||
| boolean skipUnavailable, | ||
| int maxPendingConnectionListeners, | ||
| int maxNumConnections, | ||
| Predicate<DiscoveryNode> nodePredicate, | ||
| List<String> seedNodes, | ||
| String proxyAddress | ||
| ) implements LinkedProjectConfig { | ||
|
|
||
| @Override | ||
| public ConnectionStrategy connectionStrategy() { | ||
| return ConnectionStrategy.SNIFF; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isConnectionEnabled() { | ||
| return seedNodes.isEmpty() == false; | ||
| } | ||
|
|
||
| @Override | ||
| public RemoteConnectionStrategy buildRemoteConnectionStrategy( | ||
| TransportService transportService, | ||
| RemoteConnectionManager connectionManager | ||
| ) { | ||
| return new SniffConnectionStrategy(this, transportService, connectionManager); | ||
| } | ||
| } | ||
|
|
||
| abstract class Builder<C extends LinkedProjectConfig, B extends Builder<C, B>> { | ||
JeremyDahlgren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| protected final ProjectId originProjectId; | ||
| protected final ProjectId linkedProjectId; | ||
| protected final String linkedProjectAlias; | ||
| protected final ConnectionStrategy connectionStrategy; | ||
| private final B concreteBuilder; | ||
| protected TimeValue transportConnectTimeout = TransportSettings.DEFAULT_CONNECT_TIMEOUT; | ||
| protected Compression.Enabled connectionCompression = TransportSettings.DEFAULT_TRANSPORT_COMPRESS; | ||
| protected Compression.Scheme connectionCompressionScheme = TransportSettings.DEFAULT_TRANSPORT_COMPRESSION_SCHEME; | ||
| protected TimeValue clusterPingSchedule = TransportSettings.DEFAULT_PING_SCHEDULE; | ||
| protected TimeValue initialConnectionTimeout = RemoteClusterSettings.DEFAULT_INITIAL_CONNECTION_TIMEOUT; | ||
| protected boolean skipUnavailable = RemoteClusterSettings.DEFAULT_SKIP_UNAVAILABLE; | ||
| protected String proxyAddress = ""; | ||
| protected int maxNumConnections; | ||
| protected int maxPendingConnectionListeners = RemoteClusterSettings.DEFAULT_MAX_PENDING_CONNECTION_LISTENERS; | ||
|
|
||
| private Builder( | ||
| ProjectId originProjectId, | ||
| ProjectId linkedProjectId, | ||
| String linkedProjectAlias, | ||
| ConnectionStrategy connectionStrategy | ||
| ) { | ||
| this.originProjectId = Objects.requireNonNull(originProjectId); | ||
| this.linkedProjectId = Objects.requireNonNull(linkedProjectId); | ||
| this.linkedProjectAlias = requireNonEmpty(linkedProjectAlias, "linkedProjectAlias"); | ||
| this.connectionStrategy = Objects.requireNonNull(connectionStrategy); | ||
| this.concreteBuilder = self(); | ||
| this.maxNumConnections = switch (connectionStrategy) { | ||
| case PROXY -> RemoteClusterSettings.ProxyConnectionStrategySettings.DEFAULT_REMOTE_SOCKET_CONNECTIONS; | ||
| case SNIFF -> RemoteClusterSettings.SniffConnectionStrategySettings.DEFAULT_REMOTE_CONNECTIONS_PER_CLUSTER; | ||
| }; | ||
| } | ||
|
|
||
| public B transportConnectTimeout(TimeValue transportConnectTimeout) { | ||
| this.transportConnectTimeout = Objects.requireNonNull(transportConnectTimeout); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B connectionCompression(Compression.Enabled connectionCompression) { | ||
| this.connectionCompression = Objects.requireNonNull(connectionCompression); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B connectionCompressionScheme(Compression.Scheme connectionCompressionScheme) { | ||
| this.connectionCompressionScheme = Objects.requireNonNull(connectionCompressionScheme); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B clusterPingSchedule(TimeValue clusterPingSchedule) { | ||
| this.clusterPingSchedule = Objects.requireNonNull(clusterPingSchedule); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B initialConnectionTimeout(TimeValue initialConnectionTimeout) { | ||
| this.initialConnectionTimeout = Objects.requireNonNull(initialConnectionTimeout); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B skipUnavailable(boolean skipUnavailable) { | ||
| this.skipUnavailable = skipUnavailable; | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B proxyAddress(String proxyAddress) { | ||
| // TODO: Eliminate leniency here allowing an empty proxy address, ES-12737. | ||
| if (Strings.hasLength(proxyAddress)) { | ||
| RemoteConnectionStrategy.parsePort(proxyAddress); | ||
| } | ||
| this.proxyAddress = proxyAddress; | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B maxNumConnections(int maxNumConnections) { | ||
| this.maxNumConnections = requireGreaterThanZero(maxNumConnections, "maxNumConnections"); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public B maxPendingConnectionListeners(int maxPendingConnectionListeners) { | ||
| this.maxPendingConnectionListeners = requireGreaterThanZero(maxPendingConnectionListeners, "maxPendingConnectionListeners"); | ||
| return concreteBuilder; | ||
| } | ||
|
|
||
| public abstract C build(); | ||
|
|
||
| protected abstract B self(); | ||
|
|
||
| protected static int requireGreaterThanZero(int value, String name) { | ||
| if (value <= 0) { | ||
| throw new IllegalArgumentException("[" + name + "] must be greater than 0"); | ||
| } | ||
| return value; | ||
| } | ||
|
|
||
| protected static String requireNonEmpty(String value, String name) { | ||
| if (Objects.requireNonNull(value).isBlank()) { | ||
| throw new IllegalArgumentException("[" + name + "] cannot be empty"); | ||
| } | ||
| return value; | ||
| } | ||
| } | ||
|
|
||
| class ProxyLinkedProjectConfigBuilder extends Builder<ProxyLinkedProjectConfig, ProxyLinkedProjectConfigBuilder> { | ||
| private String serverName = ""; | ||
|
|
||
| public ProxyLinkedProjectConfigBuilder(String linkedProjectAlias) { | ||
| super(ProjectId.DEFAULT, ProjectId.DEFAULT, linkedProjectAlias, ConnectionStrategy.PROXY); | ||
| } | ||
|
|
||
| public ProxyLinkedProjectConfigBuilder(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) { | ||
| super(originProjectId, linkedProjectId, linkedProjectAlias, ConnectionStrategy.PROXY); | ||
| } | ||
|
|
||
| public ProxyLinkedProjectConfigBuilder serverName(String serverName) { | ||
| this.serverName = serverName; | ||
| return this; | ||
| } | ||
|
|
||
| public ProxyLinkedProjectConfig build() { | ||
| return new ProxyLinkedProjectConfig( | ||
| originProjectId, | ||
| linkedProjectId, | ||
| linkedProjectAlias, | ||
| transportConnectTimeout, | ||
| connectionCompression, | ||
| connectionCompressionScheme, | ||
| clusterPingSchedule, | ||
| initialConnectionTimeout, | ||
| skipUnavailable, | ||
| maxPendingConnectionListeners, | ||
| maxNumConnections, | ||
| proxyAddress, | ||
| serverName | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
| protected ProxyLinkedProjectConfigBuilder self() { | ||
| return this; | ||
| } | ||
| } | ||
|
|
||
| class SniffLinkedProjectConfigBuilder extends Builder<SniffLinkedProjectConfig, SniffLinkedProjectConfigBuilder> { | ||
| private Predicate<DiscoveryNode> nodePredicate = RemoteClusterSettings.SniffConnectionStrategySettings.DEFAULT_NODE_PREDICATE; | ||
| private List<String> seedNodes = RemoteClusterSettings.SniffConnectionStrategySettings.DEFAULT_SEED_NODES; | ||
|
|
||
| public SniffLinkedProjectConfigBuilder(String linkedProjectAlias) { | ||
| super(ProjectId.DEFAULT, ProjectId.DEFAULT, linkedProjectAlias, ConnectionStrategy.SNIFF); | ||
| } | ||
|
|
||
| public SniffLinkedProjectConfigBuilder(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) { | ||
| super(originProjectId, linkedProjectId, linkedProjectAlias, ConnectionStrategy.SNIFF); | ||
| } | ||
|
|
||
| public SniffLinkedProjectConfigBuilder nodePredicate(Predicate<DiscoveryNode> nodePredicate) { | ||
| this.nodePredicate = Objects.requireNonNull(nodePredicate); | ||
| return this; | ||
| } | ||
|
|
||
| public SniffLinkedProjectConfigBuilder seedNodes(List<String> seedNodes) { | ||
| // TODO: Eliminate leniency here allowing an empty set of seed nodes, ES-12737. | ||
| Objects.requireNonNull(seedNodes).forEach(RemoteConnectionStrategy::parsePort); | ||
| this.seedNodes = seedNodes; | ||
| return this; | ||
| } | ||
|
|
||
| public SniffLinkedProjectConfig build() { | ||
| return new SniffLinkedProjectConfig( | ||
| originProjectId, | ||
| linkedProjectId, | ||
| linkedProjectAlias, | ||
| transportConnectTimeout, | ||
| connectionCompression, | ||
| connectionCompressionScheme, | ||
| clusterPingSchedule, | ||
| initialConnectionTimeout, | ||
| skipUnavailable, | ||
| maxPendingConnectionListeners, | ||
| maxNumConnections, | ||
| nodePredicate, | ||
| seedNodes, | ||
| proxyAddress | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
| protected SniffLinkedProjectConfigBuilder self() { | ||
| return this; | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.