Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7cd7ae3
Use LinkedProjectConfig record in RemoteClusterService
JeremyDahlgren Aug 20, 2025
26c177a
[CI] Auto commit changes from spotless
Aug 20, 2025
76f6a09
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 20, 2025
9d1e8ae
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 20, 2025
f720699
Fix merged settings, check linked project alias
JeremyDahlgren Aug 21, 2025
a38fbfe
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 21, 2025
62b42a1
[CI] Auto commit changes from spotless
Aug 21, 2025
92adad2
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 21, 2025
46caac4
Split config into sniff and proxy types
JeremyDahlgren Aug 21, 2025
e287fc8
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 21, 2025
7c23847
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 21, 2025
c2d3b15
Add more validations and tests
JeremyDahlgren Aug 22, 2025
aa17734
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 22, 2025
7db5da7
[CI] Auto commit changes from spotless
Aug 22, 2025
3bb6cc0
Rename buildConnectionStrategy to buildRemoteConnectionStrategy
JeremyDahlgren Aug 26, 2025
607a247
LinkedProjectConfig can be sealed
JeremyDahlgren Aug 26, 2025
73dda5c
Do not use builder in RemoteConnectionEnabled.isConnectionEnabled()
JeremyDahlgren Aug 26, 2025
98b1fb2
Rename LinkedProjectConfig Sniff and Proxy record types
JeremyDahlgren Aug 26, 2025
5666b68
Restore package-private access level for ConnectionStrategy enum
JeremyDahlgren Aug 26, 2025
db1a1af
Use -randomNonNegativeInt() in assertChecksGreaterThanZero()
JeremyDahlgren Aug 26, 2025
9cdedbd
Reduce visibility of some methods to private
JeremyDahlgren Aug 26, 2025
21a3685
add maxNumConnections and proxyAddress to LinkedProjectConfig interface
JeremyDahlgren Aug 26, 2025
0725cb3
Reduce changes in RemoteConnectionStrategyTests
JeremyDahlgren Aug 26, 2025
fb0e431
Refactor LinkedProjectConfig.Builder into concrete subclasses
JeremyDahlgren Aug 26, 2025
a545f12
Refactor default values
JeremyDahlgren Aug 26, 2025
7c079ad
link ticket for eliminating leniency in builders
JeremyDahlgren Aug 26, 2025
18d2917
[CI] Auto commit changes from spotless
Aug 26, 2025
939cde6
Fix failing TimeValue equalTo()
JeremyDahlgren Aug 26, 2025
d1fc308
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 26, 2025
dd81943
Remove unnecessary builder base class param
JeremyDahlgren Aug 27, 2025
bbd8aef
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 27, 2025
c513311
Merge branch 'main' into es-12656-linked-proj-cfg
JeremyDahlgren Aug 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@
import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ProxyConnectionStrategy;
import org.elasticsearch.transport.RemoteClusterPortSettings;
import org.elasticsearch.transport.RemoteClusterSettings;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings;
import org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -367,19 +366,19 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterSettings.REMOTE_NODE_ATTRIBUTE,
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESSION_SCHEME,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
ProxyConnectionStrategySettings.PROXY_ADDRESS,
ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategySettings.SERVER_NAME,
SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategySettings.REMOTE_CONNECTIONS_PER_CLUSTER,
SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

import static org.elasticsearch.transport.RemoteConnectionStrategy.ConnectionStrategy;

/**
* <p>Configuration for initializing {@link RemoteClusterConnection}s to linked projects.</p>
*
* <p>A {@link LinkedProjectConfig.Builder} instance is used to build up the configuration,
* with a concrete configuration type generated via {@link LinkedProjectConfig.Builder#build}
* based on the {@link RemoteConnectionStrategy.ConnectionStrategy} that was specified,
* or by constructing a specific configuration type via
* {@link LinkedProjectConfig.Builder#buildProxyConnectionStrategyConfig()} or
* {@link LinkedProjectConfig.Builder#buildSniffConnectionStrategyConfig()}.</p>
*
* <p>The {@link RemoteClusterSettings#toConfig(String, Settings)} and {@link RemoteClusterSettings#toConfig(ProjectId, String, Settings)}
* methods can be used to read {@link RemoteClusterSettings} to build a concrete {@link LinkedProjectConfig} from {@link Settings}.</p>
*/
public interface LinkedProjectConfig {
ProjectId originProjectId();

ProjectId linkedProjectId();

String linkedProjectAlias();

TimeValue transportConnectTimeout();

Compression.Enabled connectionCompression();

Compression.Scheme connectionCompressionScheme();

TimeValue clusterPingSchedule();

TimeValue initialConnectionTimeout();

boolean skipUnavailable();

int maxPendingConnectionListeners();

ConnectionStrategy connectionStrategy();

boolean isConnectionEnabled();

RemoteConnectionStrategy buildConnectionStrategy(TransportService transportService, RemoteConnectionManager connectionManager);

/**
* Configuration for initializing {@link RemoteClusterConnection}s to linked projects using the {@link ProxyConnectionStrategy}.
*/
record ProxyConnectionStrategyConfig(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
TimeValue transportConnectTimeout,
Compression.Enabled connectionCompression,
Compression.Scheme connectionCompressionScheme,
TimeValue clusterPingSchedule,
TimeValue initialConnectionTimeout,
boolean skipUnavailable,
int maxPendingConnectionListeners,
int maxNumConnections,
String proxyAddress,
String serverName
) implements LinkedProjectConfig {

@Override
public ConnectionStrategy connectionStrategy() {
return ConnectionStrategy.PROXY;
}

@Override
public boolean isConnectionEnabled() {
return Strings.isEmpty(proxyAddress) == false;
}

@Override
public RemoteConnectionStrategy buildConnectionStrategy(
TransportService transportService,
RemoteConnectionManager connectionManager
) {
return new ProxyConnectionStrategy(this, transportService, connectionManager);
}
}

/**
* Configuration for initializing {@link RemoteClusterConnection}s to linked projects using the {@link SniffConnectionStrategy}.
*/
record SniffConnectionStrategyConfig(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
TimeValue transportConnectTimeout,
Compression.Enabled connectionCompression,
Compression.Scheme connectionCompressionScheme,
TimeValue clusterPingSchedule,
TimeValue initialConnectionTimeout,
boolean skipUnavailable,
int maxPendingConnectionListeners,
int maxNumConnections,
Predicate<DiscoveryNode> nodePredicate,
List<String> seedNodes,
String proxyAddress
) implements LinkedProjectConfig {

@Override
public ConnectionStrategy connectionStrategy() {
return ConnectionStrategy.SNIFF;
}

@Override
public boolean isConnectionEnabled() {
return seedNodes.isEmpty() == false;
}

@Override
public RemoteConnectionStrategy buildConnectionStrategy(
TransportService transportService,
RemoteConnectionManager connectionManager
) {
return new SniffConnectionStrategy(this, transportService, connectionManager);
}
}

TimeValue DEFAULT_TRANSPORT_CONNECT_TIMEOUT = TimeValue.timeValueSeconds(30);
Compression.Enabled DEFAULT_CONNECTION_COMPRESSION = Compression.Enabled.INDEXING_DATA;
Compression.Scheme DEFAULT_CONNECTION_COMPRESSION_SCHEME = Compression.Scheme.LZ4;
TimeValue DEFAULT_CLUSTER_PING_SCHEDULE = TimeValue.MINUS_ONE;
TimeValue DEFAULT_INITIAL_CONNECTION_TIMEOUT = TimeValue.timeValueSeconds(30);
boolean DEFAULT_SKIP_UNAVAILABLE = true;
int DEFAULT_REMOTE_MAX_PENDING_CONNECTION_LISTENERS = 1000;
int DEFAULT_PROXY_NUM_SOCKET_CONNECTIONS = 18;
int DEFAULT_SNIFF_MAX_NUM_CONNECTIONS = 3;
List<String> DEFAULT_SNIFF_SEED_NODES = Collections.emptyList();
Predicate<DiscoveryNode> DEFAULT_SNIFF_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.canContainData() || node.isIngestNode());

static Builder buildForAlias(String linkedProjectAlias) {
return buildForLinkedProject(ProjectId.DEFAULT, ProjectId.DEFAULT, linkedProjectAlias);
}

static Builder buildForLinkedProject(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) {
return new Builder(originProjectId, linkedProjectId, linkedProjectAlias);
}

class Builder {
private ProjectId originProjectId;
private ProjectId linkedProjectId;
private String linkedProjectAlias;
private TimeValue transportConnectTimeout = DEFAULT_TRANSPORT_CONNECT_TIMEOUT;
private Compression.Enabled connectionCompression = DEFAULT_CONNECTION_COMPRESSION;
private Compression.Scheme connectionCompressionScheme = DEFAULT_CONNECTION_COMPRESSION_SCHEME;
private TimeValue clusterPingSchedule = DEFAULT_CLUSTER_PING_SCHEDULE;
private TimeValue initialConnectionTimeout = DEFAULT_INITIAL_CONNECTION_TIMEOUT;
private boolean skipUnavailable = DEFAULT_SKIP_UNAVAILABLE;
private ConnectionStrategy connectionStrategy;
private int proxyNumSocketConnections = DEFAULT_PROXY_NUM_SOCKET_CONNECTIONS;
private String proxyAddress = "";
private String proxyServerName = "";
private int sniffMaxNumConnections = DEFAULT_SNIFF_MAX_NUM_CONNECTIONS;
private Predicate<DiscoveryNode> sniffNodePredicate = DEFAULT_SNIFF_NODE_PREDICATE;
private List<String> sniffSeedNodes = DEFAULT_SNIFF_SEED_NODES;
private int maxPendingConnectionListeners = DEFAULT_REMOTE_MAX_PENDING_CONNECTION_LISTENERS;

private Builder(ProjectId originProjectId, ProjectId linkedProjectId, String linkedProjectAlias) {
originProjectId(originProjectId);
linkedProjectId(linkedProjectId);
linkedProjectAlias(linkedProjectAlias);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really torn on how this builder should work. I am not a big fan of having a single builder for both config classes so that my intuition is to go with a base builder and subclasses. But that means using generic which adds complexity and may not worth it since we have only two variants. So I think we can stick with this current approach. But can we make it a bit stricter?

  1. Can connectionStrategy, i.e. mode, also be decided upfront and passed as a constructor argument?
  2. If we do the above, we should be able to configure the default maxNumConnections conditionally based on the connectionStrategy so that there is no need to have separate sniffMaxNumConnections and proxyNumSocketConnections fields (and methods)?
  3. The value of connectionStrategy can be used to validate whether builder method is should be called, e.g. if the strategy is proxy, the sniffSeedNodes should throw exception.
  4. Can we make all field passed in constructor final? It does not seem reasonable to change these fields, e.g. originalProjectId or connectionStrategy in following build methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't like how a single build() function ended up requiring casts, so I refactored into builder subclasses. Let me know what you think of how it turned out.


public Builder originProjectId(ProjectId originProjectId) {
this.originProjectId = Objects.requireNonNull(originProjectId);
return this;
}

public Builder linkedProjectId(ProjectId linkedProjectId) {
this.linkedProjectId = Objects.requireNonNull(linkedProjectId);
return this;
}

public Builder linkedProjectAlias(String linkedProjectAlias) {
this.linkedProjectAlias = requireNonEmpty(linkedProjectAlias, "linkedProjectAlias");
return this;
}

public Builder transportConnectTimeout(TimeValue transportConnectTimeout) {
this.transportConnectTimeout = Objects.requireNonNull(transportConnectTimeout);
return this;
}

public Builder connectionCompression(Compression.Enabled connectionCompression) {
this.connectionCompression = Objects.requireNonNull(connectionCompression);
return this;
}

public Builder connectionCompressionScheme(Compression.Scheme connectionCompressionScheme) {
this.connectionCompressionScheme = Objects.requireNonNull(connectionCompressionScheme);
return this;
}

public Builder clusterPingSchedule(TimeValue clusterPingSchedule) {
this.clusterPingSchedule = Objects.requireNonNull(clusterPingSchedule);
return this;
}

public Builder initialConnectionTimeout(TimeValue initialConnectionTimeout) {
this.initialConnectionTimeout = Objects.requireNonNull(initialConnectionTimeout);
return this;
}

public Builder skipUnavailable(boolean skipUnavailable) {
this.skipUnavailable = skipUnavailable;
return this;
}

public Builder connectionStrategy(ConnectionStrategy connectionStrategy) {
this.connectionStrategy = Objects.requireNonNull(connectionStrategy);
return this;
}

public Builder proxyNumSocketConnections(int proxyNumSocketConnections) {
this.proxyNumSocketConnections = requireGreaterThanZero(proxyNumSocketConnections, "proxyNumSocketConnections");
return this;
}

public Builder proxyAddress(String proxyAddress) {
if (Strings.hasLength(proxyAddress)) {
RemoteConnectionStrategy.parsePort(proxyAddress);
}
this.proxyAddress = proxyAddress;
return this;
}

public Builder proxyServerName(String proxyServerName) {
this.proxyServerName = proxyServerName;
return this;
}

public Builder sniffMaxNumConnections(int sniffMaxNumConnections) {
this.sniffMaxNumConnections = requireGreaterThanZero(sniffMaxNumConnections, "sniffMaxNumConnections");
return this;
}

public Builder sniffNodePredicate(Predicate<DiscoveryNode> sniffNodePredicate) {
this.sniffNodePredicate = Objects.requireNonNull(sniffNodePredicate);
return this;
}

public Builder sniffSeedNodes(List<String> sniffSeedNodes) {
Objects.requireNonNull(sniffSeedNodes).forEach(RemoteConnectionStrategy::parsePort);
this.sniffSeedNodes = sniffSeedNodes;
return this;
}

public Builder maxPendingConnectionListeners(int maxPendingConnectionListeners) {
this.maxPendingConnectionListeners = requireGreaterThanZero(maxPendingConnectionListeners, "maxPendingConnectionListeners");
return this;
}

public LinkedProjectConfig build() {
if (connectionStrategy == null) {
throw new IllegalArgumentException("[connectionStrategy] must be set before calling build()");
}
return switch (connectionStrategy) {
case PROXY -> buildProxyConnectionStrategyConfig();
case SNIFF -> buildSniffConnectionStrategyConfig();
};
}

public ProxyConnectionStrategyConfig buildProxyConnectionStrategyConfig() {
if (connectionStrategy != null && ConnectionStrategy.PROXY.equals(connectionStrategy) == false) {
throw new IllegalArgumentException("ConnectionStrategy must be PROXY");
}
return new ProxyConnectionStrategyConfig(
originProjectId,
linkedProjectId,
linkedProjectAlias,
transportConnectTimeout,
connectionCompression,
connectionCompressionScheme,
clusterPingSchedule,
initialConnectionTimeout,
skipUnavailable,
maxPendingConnectionListeners,
proxyNumSocketConnections,
proxyAddress,
proxyServerName
);
}

public SniffConnectionStrategyConfig buildSniffConnectionStrategyConfig() {
if (connectionStrategy != null && ConnectionStrategy.SNIFF.equals(connectionStrategy) == false) {
throw new IllegalArgumentException("ConnectionStrategy must be SNIFF");
}
return new SniffConnectionStrategyConfig(
originProjectId,
linkedProjectId,
linkedProjectAlias,
transportConnectTimeout,
connectionCompression,
connectionCompressionScheme,
clusterPingSchedule,
initialConnectionTimeout,
skipUnavailable,
maxPendingConnectionListeners,
sniffMaxNumConnections,
sniffNodePredicate,
sniffSeedNodes,
proxyAddress
);
}

private static int requireGreaterThanZero(int value, String name) {
if (value <= 0) {
throw new IllegalArgumentException("[" + name + "] must be greater than 0");
}
return value;
}

private String requireNonEmpty(String value, String name) {
if (Objects.requireNonNull(value).isBlank()) {
throw new IllegalArgumentException("[" + name + "] cannot be empty");
}
return value;
}
}
}
Loading