From 616359b061322e05e0304e96ad29264d73212adc Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Fri, 29 Aug 2025 12:43:58 -0400 Subject: [PATCH 01/11] Add LinkedProjectConfigService with ClusterSettings implementation Adds a new interface LinkedProjectConfigService for registering LinkedProjectConfigListener instances to receive linked project configuration updates. This change introduces an abstract base class and a ClusterSettings based concrete class that provides the same functionality that was previously implemented in RemoteClusterAware and RemoteClusterService. Subclasses of RemoteClusterAware have been adjusted to receive LinkedProjectConfig updates instead of Settings updates. These changes allow for other linked project config implementations to be used in the system, freeing consumers of the updates from the details of the update mechanism used. Resolves: ES-12730 --- .../elasticsearch/node/NodeConstruction.java | 11 ++- .../node/NodeServiceProvider.java | 3 + .../node/PluginServiceInstances.java | 4 +- .../org/elasticsearch/plugins/Plugin.java | 6 ++ .../AbstractLinkedProjectConfigService.java | 39 ++++++++++ ...terSettingsLinkedProjectConfigService.java | 65 ++++++++++++++++ .../transport/LinkedProjectConfigService.java | 43 ++++++++++ .../transport/RemoteClusterAware.java | 47 +++-------- .../transport/RemoteClusterService.java | 78 ++++++++----------- .../transport/RemoteClusterSettings.java | 3 + .../transport/TransportService.java | 10 ++- .../transport/RemoteClusterAwareTests.java | 6 +- .../transport/RemoteClusterServiceTests.java | 42 ++++++---- .../java/org/elasticsearch/node/MockNode.java | 3 + .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../xpack/ccr/CcrRepositoryManager.java | 27 +++---- .../xpack/eql/plugin/EqlPlugin.java | 20 +++-- .../xpack/ql/index/RemoteClusterResolver.java | 20 ++--- .../xpack/security/Security.java | 4 + .../security/authz/AuthorizationService.java | 4 +- .../authz/IndicesAndAliasesResolver.java | 31 ++++---- .../xpack/security/SecurityTests.java | 2 + .../authz/AuthorizationServiceTests.java | 9 +++ .../authz/IndicesAndAliasesResolverTests.java | 10 ++- .../xpack/sql/plugin/SqlPlugin.java | 12 +-- .../xpack/sql/plugin/SqlPluginTests.java | 12 +-- .../TransformCheckpointServiceNodeTests.java | 10 +-- .../xpack/transform/Transform.java | 2 +- .../checkpoint/RemoteClusterResolver.java | 20 ++--- .../TransformCheckpointService.java | 6 +- .../DefaultCheckpointProviderTests.java | 4 +- .../TimeBasedCheckpointProviderTests.java | 4 +- ...TransformPersistentTasksExecutorTests.java | 8 +- .../transforms/TransformTaskTests.java | 9 +-- 34 files changed, 372 insertions(+), 204 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java create mode 100644 server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java create mode 100644 server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index c909c9a25e5b8..d202dbd69527f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -223,6 +223,8 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; +import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.usage.UsageService; @@ -969,6 +971,11 @@ public Map queryFields() { final IndexingPressure indexingLimits = new IndexingPressure(settings); + final var linkedProjectConfigService = pluginsService.loadSingletonServiceProvider( + LinkedProjectConfigService.class, + () -> new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver) + ); + PluginServiceInstances pluginServices = new PluginServiceInstances( client, clusterService, @@ -992,7 +999,8 @@ public Map queryFields() { taskManager, projectResolver, slowLogFieldProvider, - indexingLimits + indexingLimits, + linkedProjectConfigService ); Collection pluginComponents = pluginsService.flatMap(plugin -> { @@ -1122,6 +1130,7 @@ public Map queryFields() { taskManager, telemetryProvider.getTracer(), nodeEnvironment.nodeId(), + linkedProjectConfigService, projectResolver ); final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry()); diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 5f1dc11ea16e1..0b8c59b3c1957 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -43,6 +43,7 @@ import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -121,6 +122,7 @@ TransportService newTransportService( TaskManager taskManager, Tracer tracer, String nodeId, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { return new TransportService( @@ -132,6 +134,7 @@ TransportService newTransportService( clusterSettings, new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), taskManager, + linkedProjectConfigService, projectResolver ); } diff --git a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java index ec58555b70505..601b768b01277 100644 --- a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java +++ b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java @@ -31,6 +31,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -57,5 +58,6 @@ public record PluginServiceInstances( TaskManager taskManager, ProjectResolver projectResolver, SlowLogFieldProvider slowLogFieldProvider, - IndexingPressure indexingPressure + IndexingPressure indexingPressure, + LinkedProjectConfigService linkedProjectConfigService ) implements Plugin.PluginServices {} diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java index 955acbe55768c..c14f7f4914983 100644 --- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -40,6 +40,7 @@ import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentParser; @@ -192,6 +193,11 @@ public interface PluginServices { * Provider for indexing pressure */ IndexingPressure indexingPressure(); + + /** + * A service for registering for linked project configuration updates. + */ + LinkedProjectConfigService linkedProjectConfigService(); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java new file mode 100644 index 0000000000000..d67a352c56768 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public abstract class AbstractLinkedProjectConfigService implements LinkedProjectConfigService { + private final List listeners = new CopyOnWriteArrayList<>(); + + @Override + public void register(LinkedProjectConfigListener listener) { + listeners.add(listener); + } + + protected void handleUpdate(LinkedProjectConfig config) { + listeners.forEach(listener -> listener.updateLinkedProject(config)); + } + + protected void handleSkipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) { + listeners.forEach( + listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java new file mode 100644 index 0000000000000..5ab1229829c6d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; + +import java.util.Collection; +import java.util.List; + +public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedProjectConfigService { + private final Settings settings; + private final ProjectResolver projectResolver; + + @SuppressWarnings("this-escape") + public ClusterSettingsLinkedProjectConfigService(Settings settings, ClusterSettings clusterSettings, ProjectResolver projectResolver) { + this.settings = settings; + this.projectResolver = projectResolver; + List> remoteClusterSettings = List.of( + RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, + RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteClusterSettings.REMOTE_CONNECTION_MODE, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, + RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS, + RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS, + RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME + ); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback); + clusterSettings.addAffixUpdateConsumer( + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, + this::skipUnavailableChangedCallback, + (alias, value) -> {} + ); + } + + @Override + public Collection loadAllLinkedProjectConfigs() { + return RemoteClusterSettings.getRemoteClusters(settings) + .stream() + .map(alias -> RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, settings)) + .toList(); + } + + private void settingsChangedCallback(String clusterAlias, Settings newSettings) { + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings); + handleUpdate(config); + } + + private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) { + handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java new file mode 100644 index 0000000000000..0cb31be1893fb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; + +import java.util.Collection; +import java.util.Collections; + +public interface LinkedProjectConfigService { + + interface LinkedProjectConfigListener { + void updateLinkedProject(LinkedProjectConfig config); + + default void skipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) {} + } + + void register(LinkedProjectConfigListener listener); + + Collection loadAllLinkedProjectConfigs(); + + LinkedProjectConfigService NOOP = new LinkedProjectConfigService() { + @Override + public void register(LinkedProjectConfigListener listener) {} + + @Override + public Collection loadAllLinkedProjectConfigs() { + return Collections.emptyList(); + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 1d6cd8ea353ef..5158d255e3acc 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -13,14 +13,13 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; import org.elasticsearch.node.Node; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -28,17 +27,15 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings; -import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings; - /** * Base class for all services and components that need up-to-date information about the registered remote clusters */ -public abstract class RemoteClusterAware { +public abstract class RemoteClusterAware implements LinkedProjectConfigService.LinkedProjectConfigListener { public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; protected final Settings settings; + private final LinkedProjectConfigService linkedProjectConfigService; private final String nodeName; private final boolean isRemoteClusterClientEnabled; @@ -46,8 +43,9 @@ public abstract class RemoteClusterAware { * Creates a new {@link RemoteClusterAware} instance * @param settings the nodes level settings */ - protected RemoteClusterAware(Settings settings) { + protected RemoteClusterAware(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { this.settings = settings; + this.linkedProjectConfigService = linkedProjectConfigService; this.nodeName = Node.NODE_NAME_SETTING.get(settings); this.isRemoteClusterClientEnabled = DiscoveryNode.isRemoteClusterClient(settings); } @@ -57,10 +55,10 @@ protected String getNodeName() { } /** - * Returns remote clusters that are enabled in these settings + * Returns all known {@link LinkedProjectConfig}s. */ - protected static Set getEnabledRemoteClusters(final Settings settings) { - return RemoteClusterSettings.getRemoteClusters(settings); + protected Collection loadAllLinkedProjectConfigs() { + return linkedProjectConfigService.loadAllLinkedProjectConfigs(); } /** @@ -215,34 +213,11 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } - void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); - } - updateRemoteCluster(clusterAlias, settings); - } - - /** - * Subclasses must implement this to receive information about updated cluster aliases. - */ - protected abstract void updateRemoteCluster(String clusterAlias, Settings settings); - /** - * Registers this instance to listen to updates on the cluster settings. + * Registers this instance to listen for linked project updates. */ - public void listenForUpdates(ClusterSettings clusterSettings) { - List> remoteClusterSettings = List.of( - RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, - RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, - RemoteClusterSettings.REMOTE_CONNECTION_MODE, - SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, - SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, - SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, - ProxyConnectionStrategySettings.PROXY_ADDRESS, - ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategySettings.SERVER_NAME - ); - clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); + public void listenForUpdates() { + linkedProjectConfigService.register(this); } public static String buildRemoteIndexName(String clusterAlias, String indexName) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index ac3c8a74af37c..d3b44a7b59746 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -84,8 +83,13 @@ public boolean isRemoteClusterServerEnabled() { private final ProjectResolver projectResolver; private final boolean canUseSkipUnavailable; - RemoteClusterService(Settings settings, TransportService transportService, ProjectResolver projectResolver) { - super(settings); + RemoteClusterService( + Settings settings, + LinkedProjectConfigService linkedProjectConfigService, + TransportService transportService, + ProjectResolver projectResolver + ) { + super(settings, linkedProjectConfigService); this.isRemoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.isSearchNode = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE); this.isStateless = DiscoveryNode.isStateless(settings); @@ -286,17 +290,13 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) { } @Override - public void listenForUpdates(ClusterSettings clusterSettings) { - super.listenForUpdates(clusterSettings); - clusterSettings.addAffixUpdateConsumer( - RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - this::updateSkipUnavailable, - (alias, value) -> {} - ); - } - - private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { - RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias); + public void skipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) { + final var remote = getConnectionsMapForProject(originProjectId).get(linkedProjectAlias); if (remote != null) { remote.setSkipUnavailable(skipUnavailable); } @@ -329,7 +329,7 @@ public synchronized void updateRemoteClusterCredentials(Supplier setti private void maybeRebuildConnectionOnCredentialsChange( ProjectId projectId, String clusterAlias, - Settings settings, + Settings newSettings, RefCountingRunnable connectionRefs ) { final var connectionsMap = getConnectionsMapForProject(projectId); @@ -344,7 +344,9 @@ private void maybeRebuildConnectionOnCredentialsChange( return; } - updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + final var config = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings); + updateRemoteCluster(config, true, ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { logger.info( @@ -374,11 +376,14 @@ public void onFailure(Exception e) { } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - @FixForMultiProject(description = "ES-12270: Refactor as needed to support project specific changes to linked remotes.") - final var projectId = projectResolver.getProjectId(); + public void updateLinkedProject(LinkedProjectConfig config) { + if (LOCAL_CLUSTER_GROUP_KEY.equals(config.linkedProjectAlias())) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } + final var projectId = config.originProjectId(); + final var clusterAlias = config.linkedProjectAlias(); CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() { + updateRemoteCluster(config, false, ActionListener.runAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { logger.info("project [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status); @@ -410,34 +415,19 @@ public void onFailure(Exception e) { // Package-access for testing. @FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.") void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener listener) { - updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener); + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") + final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings); + updateRemoteCluster(config, false, listener); } /** * Adds, rebuilds, or closes and removes the connection for the specified remote cluster. * - * @param projectId The project the remote cluster is associated with. - * @param clusterAlias The alias used for the remote cluster being connected. - * @param newSettings The updated settings for the remote connection. + * @param config The linked project configuration. * @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it. * @param listener The listener invoked once the configured cluster has been connected. */ - private synchronized void updateRemoteCluster( - ProjectId projectId, - String clusterAlias, - Settings newSettings, - boolean forceRebuild, - ActionListener listener - ) { - if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); - } - final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); - @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") - final var linkedProjectConfig = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings); - updateRemoteCluster(linkedProjectConfig, forceRebuild, listener); - } - private synchronized void updateRemoteCluster( LinkedProjectConfig config, boolean forceRebuild, @@ -496,17 +486,15 @@ void initializeRemoteClusters() { final var projectId = projectResolver.getProjectId(); final TimeValue timeValue = RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); + final var enabledClusters = loadAllLinkedProjectConfigs(); if (enabledClusters.isEmpty()) { return; } CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future); - for (String clusterAlias : enabledClusters) { - @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") - final var linkedProjectConfig = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, settings); - updateRemoteCluster(linkedProjectConfig, false, listener.map(ignored -> null)); + for (LinkedProjectConfig config : enabledClusters) { + updateRemoteCluster(config, false, listener.map(ignored -> null)); } if (enabledClusters.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java index f5b72bf6e40b8..3c2b02e8533ce 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java @@ -337,6 +337,9 @@ public static LinkedProjectConfig toConfig( String linkedProjectAlias, Settings settings ) { + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(linkedProjectAlias)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } final var strategy = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(linkedProjectAlias).get(settings); final var builder = switch (strategy) { case SNIFF -> SniffConnectionStrategySettings.readSettings( diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 086bef3093dc6..c5f6c7b9e28dc 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -275,6 +275,11 @@ public TransportService( clusterSettings, connectionManager, taskManger, + new ClusterSettingsLinkedProjectConfigService( + settings, + clusterSettings != null ? clusterSettings : ClusterSettings.createBuiltInClusterSettings(), + DefaultProjectResolver.INSTANCE + ), DefaultProjectResolver.INSTANCE ); } @@ -289,6 +294,7 @@ public TransportService( @Nullable ClusterSettings clusterSettings, ConnectionManager connectionManager, TaskManager taskManger, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { this.transport = transport; @@ -304,13 +310,13 @@ public TransportService( this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings); - remoteClusterService = new RemoteClusterService(settings, this, projectResolver); + remoteClusterService = new RemoteClusterService(settings, linkedProjectConfigService, this, projectResolver); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); if (remoteClusterClient) { - remoteClusterService.listenForUpdates(clusterSettings); + remoteClusterService.listenForUpdates(); } clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java index 2394e0b07cc57..a8f9a62cee38c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java @@ -160,13 +160,11 @@ public void testGroupClusterIndicesFail() { private static class RemoteClusterAwareTest extends RemoteClusterAware { RemoteClusterAwareTest() { - super(Settings.EMPTY); + super(Settings.EMPTY, LinkedProjectConfigService.NOOP); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - - } + public void updateLinkedProject(LinkedProjectConfig config) {} @Override public Map> groupClusterIndices(Set remoteClusterNames, String[] requestIndices) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index c4fd4b20ac48c..60080bc4eb1ca 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -58,6 +58,7 @@ import static org.elasticsearch.test.NodeRoles.removeRoles; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings; import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings; +import static org.elasticsearch.transport.RemoteClusterSettings.toConfig; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; @@ -73,8 +74,21 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - private RemoteClusterService createRemoteClusterService(final Settings settings, final MockTransportService transportService) { - return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE); + private RemoteClusterService createRemoteClusterService(Settings settings, MockTransportService transportService) { + return createRemoteClusterService(settings, ClusterSettings.createBuiltInClusterSettings(), transportService); + } + + private RemoteClusterService createRemoteClusterService( + Settings settings, + ClusterSettings clusterSettings, + MockTransportService transportService + ) { + return new RemoteClusterService( + settings, + new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), + transportService, + DefaultProjectResolver.INSTANCE + ); } private MockTransportService startTransport( @@ -505,7 +519,7 @@ public void testIncrementallyAddClusters() throws IOException { // connect before returning. new Thread(() -> { try { - service.validateAndUpdateRemoteCluster("cluster_1", cluster1Settings); + service.updateLinkedProject(toConfig("cluster_1", cluster1Settings)); clusterAdded.onResponse(null); } catch (Exception e) { clusterAdded.onFailure(e); @@ -518,16 +532,16 @@ public void testIncrementallyAddClusters() throws IOException { "cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()) ); - service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings); + service.updateLinkedProject(toConfig("cluster_2", cluster2Settings)); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); - service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled); + service.updateLinkedProject(toConfig("cluster_2", cluster2SettingsDisabled)); assertFalse(isRemoteClusterRegistered(service, "cluster_2")); IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, - () -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY) + () -> service.updateLinkedProject(toConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)) ); assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } @@ -572,10 +586,9 @@ public void testDefaultPingSchedule() throws IOException { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertTrue(hasRegisteredClusters(service)); - service.validateAndUpdateRemoteCluster( - "cluster_1", - createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())) - ); + final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + service.updateLinkedProject(toConfig("cluster_1", mergedSettings)); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); @@ -685,7 +698,7 @@ public void testChangeSettings() throws Exception { settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); } settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build()); + service.updateLinkedProject(toConfig("cluster_1", settingsChange.build())); assertBusy(remoteClusterConnection::isClosed); remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); @@ -1794,14 +1807,13 @@ private Settings buildRemoteClusterSettings(String clusterAlias, String address) } public void testLogsConnectionResult() throws IOException { - + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); try ( var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var remoteClusterService = createRemoteClusterService(Settings.EMPTY, local) + var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) ) { - var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); - remoteClusterService.listenForUpdates(clusterSettings); + remoteClusterService.listenForUpdates(); assertThatLogger( () -> clusterSettings.applySettings( diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index a6ab3699eb206..e8fc4d4a539b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -50,6 +50,7 @@ import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -174,6 +175,7 @@ protected TransportService newTransportService( TaskManager taskManager, Tracer tracer, String nodeId, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { @@ -193,6 +195,7 @@ protected TransportService newTransportService( taskManager, tracer, nodeId, + linkedProjectConfigService, projectResolver ); } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index bfc85f3efb0fe..48c7b2e92e3a0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -183,7 +183,7 @@ public Collection createComponents(PluginServices services) { return List.of( ccrLicenseChecker, restoreSourceService, - new CcrRepositoryManager(settings, services.clusterService(), client), + new CcrRepositoryManager(settings, services.linkedProjectConfigService(), client), new ShardFollowTaskCleaner(services.clusterService(), services.threadPool(), client), new AutoFollowCoordinator( settings, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index a37f75daaa6e5..b81307d7b19f5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -11,28 +11,26 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.repository.CcrRepository; -import java.util.Set; - class CcrRepositoryManager extends AbstractLifecycleComponent { private final Client client; private final RemoteSettingsUpdateListener updateListener; - CcrRepositoryManager(Settings settings, ClusterService clusterService, Client client) { + CcrRepositoryManager(Settings settings, LinkedProjectConfigService linkedProjectConfigService, Client client) { this.client = client; - updateListener = new RemoteSettingsUpdateListener(settings); - updateListener.listenForUpdates(clusterService.getClusterSettings()); + updateListener = new RemoteSettingsUpdateListener(settings, linkedProjectConfigService); + updateListener.listenForUpdates(); } @Override @@ -62,21 +60,20 @@ private void deleteRepository(String repositoryName) { private class RemoteSettingsUpdateListener extends RemoteClusterAware { - private RemoteSettingsUpdateListener(Settings settings) { - super(settings); + private RemoteSettingsUpdateListener(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { + super(settings, linkedProjectConfigService); } void init() { - Set clusterAliases = getEnabledRemoteClusters(settings); - for (String clusterAlias : clusterAliases) { - putRepository(CcrRepository.NAME_PREFIX + clusterAlias); + for (var config : loadAllLinkedProjectConfigs()) { + putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); } } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { + public void updateLinkedProject(LinkedProjectConfig config) { + String repositoryName = CcrRepository.NAME_PREFIX + config.linkedProjectAlias(); + if (config.isConnectionEnabled()) { putRepository(repositoryName); } else { deleteRepository(repositoryName); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index 4ef003fb84381..1b0f71debddf0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -10,7 +10,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; @@ -27,6 +26,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -76,14 +76,24 @@ public EqlPlugin() {} @Override public Collection createComponents(PluginServices services) { - return createComponents(services.client(), services.environment().settings(), services.clusterService()); + return createComponents( + services.client(), + services.environment().settings(), + services.clusterService().getClusterName().value(), + services.linkedProjectConfigService() + ); } - private Collection createComponents(Client client, Settings settings, ClusterService clusterService) { - RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + private Collection createComponents( + Client client, + Settings settings, + String clusterName, + LinkedProjectConfigService linkedProjectConfigService + ) { + RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); IndexResolver indexResolver = new IndexResolver( client, - clusterService.getClusterName().value(), + clusterName, DefaultDataTypeRegistry.INSTANCE, remoteClusterResolver::remoteClusters ); diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java index 7a815bc09cf9b..2271bae6e27ae 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java @@ -7,10 +7,10 @@ package org.elasticsearch.xpack.ql.index; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import java.util.Set; import java.util.TreeSet; @@ -19,18 +19,18 @@ public final class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; - public RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { - super(settings); - clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); - listenForUpdates(clusterSettings); + public RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { + super(settings, linkedProjectConfigService); + clusters = new CopyOnWriteArraySet<>(loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList()); + listenForUpdates(); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 389622118832b..05501f948654a 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -107,6 +107,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterSettings; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; @@ -747,6 +748,7 @@ public Collection createComponents(PluginServices services) { services.indexNameExpressionResolver(), services.telemetryProvider(), new PersistentTasksService(services.clusterService(), services.threadPool(), services.client()), + services.linkedProjectConfigService(), services.projectResolver() ); } catch (final Exception e) { @@ -767,6 +769,7 @@ Collection createComponents( IndexNameExpressionResolver expressionResolver, TelemetryProvider telemetryProvider, PersistentTasksService persistentTasksService, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) throws Exception { logger.info("Security is {}", enabled ? "enabled" : "disabled"); @@ -1142,6 +1145,7 @@ Collection createComponents( operatorPrivilegesService.get(), restrictedIndices, authorizationDenialMessages.get(), + linkedProjectConfigService, projectResolver ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index f7f0f48f1c0fe..15fc7d6c8defc 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -49,6 +49,7 @@ import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -164,12 +165,13 @@ public AuthorizationService( OperatorPrivilegesService operatorPrivilegesService, RestrictedIndices restrictedIndices, AuthorizationDenialMessages authorizationDenialMessages, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { this.clusterService = clusterService; this.auditTrailService = auditTrailService; this.restrictedIndices = restrictedIndices; - this.indicesAndAliasesResolver = new IndicesAndAliasesResolver(settings, clusterService, resolver); + this.indicesAndAliasesResolver = new IndicesAndAliasesResolver(settings, linkedProjectConfigService, resolver); this.authcFailureHandler = authcFailureHandler; this.threadContext = threadPool.getThreadContext(); this.securityContext = new SecurityContext(settings, this.threadContext); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index fd5e82a4edb24..af1cf52703ed1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -22,18 +22,17 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; import org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField; @@ -59,10 +58,14 @@ class IndicesAndAliasesResolver { private final IndexAbstractionResolver indexAbstractionResolver; private final RemoteClusterResolver remoteClusterResolver; - IndicesAndAliasesResolver(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver) { + IndicesAndAliasesResolver( + Settings settings, + LinkedProjectConfigService linkedProjectConfigService, + IndexNameExpressionResolver resolver + ) { this.nameExpressionResolver = resolver; this.indexAbstractionResolver = new IndexAbstractionResolver(resolver); - this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + this.remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); } /** @@ -545,18 +548,20 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; - private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { - super(settings); - clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); - listenForUpdates(clusterSettings); + private RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { + super(settings, linkedProjectConfigService); + clusters = new CopyOnWriteArraySet<>( + loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + ); + listenForUpdates(); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index bf82846a81da4..caf131edbe3b6 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -73,6 +73,7 @@ import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.usage.UsageService; import org.elasticsearch.watcher.ResourceWatcherService; @@ -262,6 +263,7 @@ private Collection createComponentsUtil(Settings settings) throws Except TestIndexNameExpressionResolver.newInstance(threadContext), TelemetryProvider.NOOP, mock(PersistentTasksService.class), + LinkedProjectConfigService.NOOP, TestProjectResolvers.alwaysThrow() ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index e4bb33c66d983..0e9393d418d8b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -133,7 +133,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService; import org.elasticsearch.transport.EmptyRequest; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xcontent.XContentBuilder; @@ -267,6 +269,7 @@ public class AuthorizationServiceTests extends ESTestCase { private SecurityContext securityContext; private ProjectResolver projectResolver; private IndexNameExpressionResolver indexNameExpressionResolver; + private LinkedProjectConfigService linkedProjectConfigService; @SuppressWarnings("unchecked") @Before @@ -320,6 +323,7 @@ public void setup() { operatorPrivilegesService = mock(OperatorPrivileges.OperatorPrivilegesService.class); projectResolver = TestProjectResolvers.singleProject(projectId); indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(projectResolver); + linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, projectResolver); authorizationService = new AuthorizationService( settings, rolesStore, @@ -336,6 +340,7 @@ public void setup() { operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); } @@ -1769,6 +1774,7 @@ public void testDenialForAnonymousUser() { operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); @@ -1819,6 +1825,7 @@ public void testDenialForAnonymousUserAuthorizationExceptionDisabled() { operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); @@ -3357,6 +3364,7 @@ public void testAuthorizationEngineSelectionForCheckPrivileges() throws Exceptio operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); @@ -3513,6 +3521,7 @@ public void getUserPrivileges(AuthorizationInfo authorizationInfo, ActionListene operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); Authentication authentication; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java index 530b012cb55c3..0462679a5ff18 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService; import org.elasticsearch.transport.EmptyRequest; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.TransportRequest; @@ -242,6 +243,7 @@ public void setup() { user = new User("user", "role"); userDashIndices = new User("dash", "dash"); userNoIndices = new User("test", "test"); + final var projectResolver = TestProjectResolvers.DEFAULT_PROJECT_ONLY; final FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(Settings.EMPTY); rolesStore = Mockito.spy( new CompositeRolesStore( @@ -254,7 +256,7 @@ public void setup() { fieldPermissionsCache, mock(ApiKeyService.class), mock(ServiceAccountService.class), - TestProjectResolvers.DEFAULT_PROJECT_ONLY, + projectResolver, new DocumentSubsetBitsetCache(Settings.EMPTY), RESTRICTED_INDICES, EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -417,7 +419,11 @@ public void setup() { ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - defaultIndicesResolver = new IndicesAndAliasesResolver(settings, clusterService, indexNameExpressionResolver); + defaultIndicesResolver = new IndicesAndAliasesResolver( + settings, + new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver), + indexNameExpressionResolver + ); } public void testDashIndicesAreAllowedInShardLevelRequests() { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java index 0af3a5a36cad8..dcdfde0def2e4 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java @@ -9,7 +9,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -24,6 +23,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -82,7 +82,8 @@ public Collection createComponents(PluginServices services) { return createComponents( services.client(), services.environment().settings(), - services.clusterService(), + services.clusterService().getClusterName().value(), + services.linkedProjectConfigService(), services.namedWriteableRegistry() ); } @@ -93,13 +94,14 @@ public Collection createComponents(PluginServices services) { Collection createComponents( Client client, Settings settings, - ClusterService clusterService, + String clusterName, + LinkedProjectConfigService linkedProjectConfigService, NamedWriteableRegistry namedWriteableRegistry ) { - RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); IndexResolver indexResolver = new IndexResolver( client, - clusterService.getClusterName().value(), + clusterName, SqlDataTypeRegistry.INSTANCE, remoteClusterResolver::remoteClusters ); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java index 47a76b4a5291d..e1e8e825ef22e 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java @@ -7,10 +7,8 @@ package org.elasticsearch.xpack.sql.plugin; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -18,29 +16,25 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.rest.RestController; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.sql.session.Cursors; import java.util.Collections; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class SqlPluginTests extends ESTestCase { public void testSqlDisabledIsNoOp() { Settings settings = Settings.builder().put("xpack.sql.enabled", false).build(); SqlPlugin plugin = new SqlPlugin(settings); - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterName()).thenReturn(new ClusterName(randomAlphaOfLength(10))); - when(clusterService.getClusterSettings()).thenReturn( - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ); assertThat( plugin.createComponents( mock(Client.class), Settings.EMPTY, - clusterService, + randomAlphaOfLength(10), + LinkedProjectConfigService.NOOP, new NamedWriteableRegistry(Cursors.getNamedWriteables()) ), hasSize(3) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 745ccae86816c..343d17c5c64e6 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -46,11 +45,11 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; @@ -177,12 +176,7 @@ public void createComponents() { transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, - mock(TaskManager.class) - ), + LinkedProjectConfigService.NOOP, transformsConfigManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index ef59f057c1319..6085d6bbe38af 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -303,7 +303,7 @@ public Collection createComponents(PluginServices services) { TransformCheckpointService checkpointService = new TransformCheckpointService( clock, settings, - clusterService, + services.linkedProjectConfigService(), configManager, auditor ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index ebf98c57c0b9e..87cb0c09de25e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -7,10 +7,10 @@ package org.elasticsearch.xpack.transform.checkpoint; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import java.util.Collections; import java.util.List; @@ -46,18 +46,18 @@ int numClusters() { } } - RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { - super(settings); - clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); - listenForUpdates(clusterSettings); + RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { + super(settings, linkedProjectConfigService); + clusters = new CopyOnWriteArraySet<>(loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList()); + listenForUpdates(); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java index 0826aaff9deab..9598633605674 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java @@ -11,9 +11,9 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.ParentTaskAssigningClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; @@ -47,14 +47,14 @@ public class TransformCheckpointService { public TransformCheckpointService( final Clock clock, final Settings settings, - final ClusterService clusterService, + LinkedProjectConfigService linkedProjectConfigService, final TransformConfigManager transformConfigManager, TransformAuditor transformAuditor ) { this.clock = clock; this.transformConfigManager = transformConfigManager; this.transformAuditor = transformAuditor; - this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + this.remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); } public CheckpointProvider getCheckpointProvider(final ParentTaskAssigningClient client, final TransformConfig transformConfig) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 3df47fb3bc066..38280f68fa5a1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; @@ -32,6 +31,7 @@ import org.elasticsearch.test.MockLog.LoggingExpectation; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; @@ -467,7 +467,7 @@ private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transfor return new DefaultCheckpointProvider( clock, parentTaskClient, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + new RemoteClusterResolver(Settings.EMPTY, LinkedProjectConfigService.NOOP), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index 15a681d093739..41da13754ac44 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ParentTaskAssigningClient; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; @@ -29,6 +28,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; @@ -287,7 +287,7 @@ private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transf return new TimeBasedCheckpointProvider( clock, parentTaskClient, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + new RemoteClusterResolver(Settings.EMPTY, LinkedProjectConfigService.NOOP), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 5fd3db63c70e6..a806c5400193a 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; @@ -660,12 +661,7 @@ private TransformServices transformServices(TransformConfigManager configManager var transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, - null - ), + LinkedProjectConfigService.NOOP, configManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 535484ed3a196..ac193e410550d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; @@ -33,6 +32,7 @@ import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; @@ -196,12 +196,7 @@ private TransformServices transformServices(Clock clock, TransformAuditor audito var transformsCheckpointService = new TransformCheckpointService( clock, Settings.EMPTY, - new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, - null - ), + LinkedProjectConfigService.NOOP, transformsConfigManager, auditor ); From 033fbbb5be905975e141b0345ec0c3c1a0a9b781 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 3 Sep 2025 17:23:22 -0400 Subject: [PATCH 02/11] Add test for ClusterSettings impl, add javadoc --- .../AbstractLinkedProjectConfigService.java | 4 + ...terSettingsLinkedProjectConfigService.java | 4 + .../transport/LinkedProjectConfigService.java | 35 +++++ ...ttingsLinkedProjectConfigServiceTests.java | 139 ++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java index d67a352c56768..3cb06f7da007e 100644 --- a/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java @@ -14,6 +14,10 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +/** + * Abstract base class for {@link LinkedProjectConfigService} implementations. + * Provides common functionality for managing a list of registered listeners and notifying them of updates. + */ public abstract class AbstractLinkedProjectConfigService implements LinkedProjectConfigService { private final List listeners = new CopyOnWriteArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java index 5ab1229829c6d..562b169fe14dd 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -18,6 +18,10 @@ import java.util.Collection; import java.util.List; +/** + * A {@link LinkedProjectConfigService} implementation that listens for {@link ClusterSettings} changes, + * creating {@link LinkedProjectConfig}s from the relevant settings and notifying registered listeners of updates. + */ public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedProjectConfigService { private final Settings settings; private final ProjectResolver projectResolver; diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java index 0cb31be1893fb..22f7d828b0843 100644 --- a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java @@ -14,11 +14,32 @@ import java.util.Collection; import java.util.Collections; +/** + * Service for registering {@link LinkedProjectConfigListener}s to be notified of changes to linked project configurations. + */ public interface LinkedProjectConfigService { + /** + * Listener interface for receiving updates about linked project configurations. + * Implementations must not throw from any of the interface methods. + */ interface LinkedProjectConfigListener { + /** + * Called when a linked project configuration has been added or updated. + * + * @param config The updated {@link LinkedProjectConfig}. + */ void updateLinkedProject(LinkedProjectConfig config); + /** + * Called when the boolean skip_unavailable setting has changed for a linked project configuration. + * Note that skip_unavailable may not be supported in all contexts where linked projects are used. + * + * @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration. + * @param linkedProjectId The {@link ProjectId} of the linked project. + * @param linkedProjectAlias The alias used for the linked project. + * @param skipUnavailable The new value of the skip_unavailable setting. + */ default void skipUnavailableChanged( ProjectId originProjectId, ProjectId linkedProjectId, @@ -27,10 +48,24 @@ default void skipUnavailableChanged( ) {} } + /** + * Registers a {@link LinkedProjectConfigListener} to receive updates about linked project configurations. + * + * @param listener The listener to register. + */ void register(LinkedProjectConfigListener listener); + /** + * Loads all existing linked project configurations for all origin projects. + * + * @return A collection of all existing {@link LinkedProjectConfig}s. + */ Collection loadAllLinkedProjectConfigs(); + /** + * A no-op stub implementation of {@link LinkedProjectConfigService} intended for use in test scenarios where linked project + * configuration updates are not needed. + */ LinkedProjectConfigService NOOP = new LinkedProjectConfigService() { @Override public void register(LinkedProjectConfigListener listener) {} diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java new file mode 100644 index 0000000000000..82feb8e18b18d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.DefaultProjectResolver; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.transport.LinkedProjectConfigService.LinkedProjectConfigListener; +import static org.elasticsearch.transport.LinkedProjectConfig.ProxyLinkedProjectConfigBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class ClusterSettingsLinkedProjectConfigServiceTests extends ESTestCase { + + private record SkipUnavailableUpdate( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) {} + + private static class StubLinkedProjectConfigListener implements LinkedProjectConfigListener { + LinkedProjectConfig updatedConfig; + SkipUnavailableUpdate skipUnavailableUpdate; + + @Override + public void updateLinkedProject(LinkedProjectConfig config) { + updatedConfig = config; + } + + @Override + public void skipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) { + skipUnavailableUpdate = new SkipUnavailableUpdate(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable); + } + + void reset() { + updatedConfig = null; + skipUnavailableUpdate = null; + } + } + + /** + * A simple test to exercise the callback registration and notification mechanism. + * Note that {@link RemoteClusterServiceTests} uses {@link ClusterSettingsLinkedProjectConfigService} + * and contains more thorough tests of all the settings being monitored. + */ + public void testListenersReceiveUpdates() { + final var alias = randomAlphaOfLength(10); + + final var initialProxyAddress = "localhost:9400"; + final var initialSettings = Settings.builder() + .put("cluster.remote." + alias + ".mode", "proxy") + .put("cluster.remote." + alias + ".proxy_address", initialProxyAddress) + .build(); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(initialSettings); + final var service = new ClusterSettingsLinkedProjectConfigService( + initialSettings, + clusterSettings, + DefaultProjectResolver.INSTANCE + ); + final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress).build(); + + // Verify we can get the linked projects on startup. + assertThat(service.loadAllLinkedProjectConfigs(), equalTo(List.of(config))); + + final int numListeners = randomIntBetween(1, 10); + final var listeners = new ArrayList(numListeners); + for (int i = 0; i < numListeners; ++i) { + listeners.add(new StubLinkedProjectConfigListener()); + service.register(listeners.getLast()); + } + + // Expect no updates when applying the same settings. + clusterSettings.applySettings(initialSettings); + for (int i = 0; i < numListeners; ++i) { + assertThat(listeners.get(i).updatedConfig, sameInstance(null)); + assertThat(listeners.get(i).skipUnavailableUpdate, sameInstance(null)); + listeners.get(i).reset(); + } + + // Change the skip_unavailable, leave the other settings alone, we should get the skip_unavailable update only. + var expectedSkipUnavailableUpdate = new SkipUnavailableUpdate( + config.originProjectId(), + config.linkedProjectId(), + config.linkedProjectAlias(), + config.skipUnavailable() == false + ); + clusterSettings.applySettings( + Settings.builder() + .put(initialSettings) + .put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable) + .build() + ); + for (int i = 0; i < numListeners; ++i) { + assertThat(listeners.get(i).updatedConfig, sameInstance(null)); + assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate)); + listeners.get(i).reset(); + } + + // Change the proxy address, and set skip_unavailable back to original value, we should get both updates. + expectedSkipUnavailableUpdate = new SkipUnavailableUpdate( + config.originProjectId(), + config.linkedProjectId(), + config.linkedProjectAlias(), + config.skipUnavailable() + ); + final var newProxyAddress = "localhost:9401"; + clusterSettings.applySettings( + Settings.builder() + .put(initialSettings) + .put("cluster.remote." + alias + ".proxy_address", newProxyAddress) + .put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable) + .build() + ); + for (int i = 0; i < numListeners; ++i) { + assertNotNull("expected non-null updatedConfig for listener " + i, listeners.get(i).updatedConfig); + assertThat(listeners.get(i).updatedConfig.proxyAddress(), equalTo(newProxyAddress)); + assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate)); + } + } +} From 26615d9767f8c0e68c8a346dfdc33cd8e6af5cbf Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 3 Sep 2025 21:31:39 +0000 Subject: [PATCH 03/11] [CI] Auto commit changes from spotless --- .../ClusterSettingsLinkedProjectConfigServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java index 82feb8e18b18d..5ed3c9467813b 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java @@ -18,8 +18,8 @@ import java.util.ArrayList; import java.util.List; -import static org.elasticsearch.transport.LinkedProjectConfigService.LinkedProjectConfigListener; import static org.elasticsearch.transport.LinkedProjectConfig.ProxyLinkedProjectConfigBuilder; +import static org.elasticsearch.transport.LinkedProjectConfigService.LinkedProjectConfigListener; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; From b24de49fd946debd325e4c4858adff808b016e3f Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Fri, 5 Sep 2025 15:39:27 -0400 Subject: [PATCH 04/11] Eliminate using LinkedProjectConfigService in RemoteClusterAware --- .../transport/RemoteClusterAware.java | 23 ++------- .../transport/RemoteClusterService.java | 33 +++++-------- .../transport/TransportService.java | 8 +-- .../transport/RemoteClusterAwareTests.java | 5 +- .../transport/RemoteClusterServiceTests.java | 49 +++++++++++-------- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../xpack/ccr/CcrRepositoryManager.java | 45 ++++++----------- .../xpack/ql/index/RemoteClusterResolver.java | 23 +++++---- .../authz/IndicesAndAliasesResolver.java | 21 ++++---- .../checkpoint/RemoteClusterResolver.java | 23 +++++---- 10 files changed, 95 insertions(+), 137 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 5158d255e3acc..f0499a7d50780 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -19,7 +19,6 @@ import org.elasticsearch.node.Node; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -28,14 +27,13 @@ import java.util.stream.Collectors; /** - * Base class for all services and components that need up-to-date information about the registered remote clusters + * Base class for services and components that utilize linked projects. */ -public abstract class RemoteClusterAware implements LinkedProjectConfigService.LinkedProjectConfigListener { +public abstract class RemoteClusterAware { public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; protected final Settings settings; - private final LinkedProjectConfigService linkedProjectConfigService; private final String nodeName; private final boolean isRemoteClusterClientEnabled; @@ -43,9 +41,8 @@ public abstract class RemoteClusterAware implements LinkedProjectConfigService.L * Creates a new {@link RemoteClusterAware} instance * @param settings the nodes level settings */ - protected RemoteClusterAware(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { + protected RemoteClusterAware(Settings settings) { this.settings = settings; - this.linkedProjectConfigService = linkedProjectConfigService; this.nodeName = Node.NODE_NAME_SETTING.get(settings); this.isRemoteClusterClientEnabled = DiscoveryNode.isRemoteClusterClient(settings); } @@ -54,13 +51,6 @@ protected String getNodeName() { return nodeName; } - /** - * Returns all known {@link LinkedProjectConfig}s. - */ - protected Collection loadAllLinkedProjectConfigs() { - return linkedProjectConfigService.loadAllLinkedProjectConfigs(); - } - /** * Check whether the index expression represents remote index or not. * The index name is assumed to be individual index (no commas) but can contain `-`, wildcards, @@ -213,13 +203,6 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } - /** - * Registers this instance to listen for linked project updates. - */ - public void listenForUpdates() { - linkedProjectConfigService.register(this); - } - public static String buildRemoteIndexName(String clusterAlias, String indexName) { return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) ? indexName diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index d3b44a7b59746..2cba4bae9c05f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -38,6 +38,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,7 +63,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Closeable, ReportingService, - IndicesExpressionGrouper { + IndicesExpressionGrouper, + LinkedProjectConfigService.LinkedProjectConfigListener { private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); @@ -83,13 +85,8 @@ public boolean isRemoteClusterServerEnabled() { private final ProjectResolver projectResolver; private final boolean canUseSkipUnavailable; - RemoteClusterService( - Settings settings, - LinkedProjectConfigService linkedProjectConfigService, - TransportService transportService, - ProjectResolver projectResolver - ) { - super(settings, linkedProjectConfigService); + RemoteClusterService(Settings settings, TransportService transportService, ProjectResolver projectResolver) { + super(settings); this.isRemoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.isSearchNode = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE); this.isStateless = DiscoveryNode.isStateless(settings); @@ -481,26 +478,20 @@ enum RemoteClusterConnectionStatus { * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection * to all configured seed nodes. */ - void initializeRemoteClusters() { + void initializeRemoteClusters(Collection configs) { + if (configs.isEmpty()) { + return; + } + @FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.") final var projectId = projectResolver.getProjectId(); final TimeValue timeValue = RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - final var enabledClusters = loadAllLinkedProjectConfigs(); - - if (enabledClusters.isEmpty()) { - return; - } - - CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future); - for (LinkedProjectConfig config : enabledClusters) { + CountDownActionListener listener = new CountDownActionListener(configs.size(), future); + for (LinkedProjectConfig config : configs) { updateRemoteCluster(config, false, listener.map(ignored -> null)); } - if (enabledClusters.isEmpty()) { - future.onResponse(null); - } - try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index c5f6c7b9e28dc..0995eed7da181 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -139,6 +139,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { volatile String[] tracerLogInclude; volatile String[] tracerLogExclude; + private final LinkedProjectConfigService linkedProjectConfigService; private final RemoteClusterService remoteClusterService; /** @@ -310,13 +311,14 @@ public TransportService( this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings); - remoteClusterService = new RemoteClusterService(settings, linkedProjectConfigService, this, projectResolver); + this.linkedProjectConfigService = linkedProjectConfigService; + remoteClusterService = new RemoteClusterService(settings, this, projectResolver); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); if (remoteClusterClient) { - remoteClusterService.listenForUpdates(); + linkedProjectConfigService.register(remoteClusterService); } clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } @@ -371,7 +373,7 @@ protected void doStart() { if (remoteClusterClient) { // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); + remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.loadAllLinkedProjectConfigs()); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java index a8f9a62cee38c..641bc499e8f37 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java @@ -160,12 +160,9 @@ public void testGroupClusterIndicesFail() { private static class RemoteClusterAwareTest extends RemoteClusterAware { RemoteClusterAwareTest() { - super(Settings.EMPTY, LinkedProjectConfigService.NOOP); + super(Settings.EMPTY); } - @Override - public void updateLinkedProject(LinkedProjectConfig config) {} - @Override public Map> groupClusterIndices(Set remoteClusterNames, String[] requestIndices) { return super.groupClusterIndices(remoteClusterNames, requestIndices); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 60080bc4eb1ca..f7d1735c174b9 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -67,6 +67,7 @@ public class RemoteClusterServiceTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private LinkedProjectConfigService linkedProjectConfigService = null; @Override public void tearDown() throws Exception { @@ -83,12 +84,14 @@ private RemoteClusterService createRemoteClusterService( ClusterSettings clusterSettings, MockTransportService transportService ) { - return new RemoteClusterService( - settings, - new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), - transportService, - DefaultProjectResolver.INSTANCE - ); + if (linkedProjectConfigService == null) { + linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService( + settings, + clusterSettings, + DefaultProjectResolver.INSTANCE + ); + } + return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE); } private MockTransportService startTransport( @@ -187,7 +190,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); @@ -401,7 +404,7 @@ public void testGroupIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); @@ -508,7 +511,7 @@ public void testIncrementallyAddClusters() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); Settings cluster1Settings = createSettings( "cluster_1", @@ -584,7 +587,7 @@ public void testDefaultPingSchedule() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); @@ -646,7 +649,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -685,7 +688,7 @@ public void testChangeSettings() throws Exception { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); @@ -771,7 +774,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -861,7 +864,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -956,7 +959,7 @@ public void testCollectNodes() throws InterruptedException, IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -1109,7 +1112,7 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(createSettings("cluster_1", seedList), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); final var taskLatch = new CountDownLatch(numTasks); @@ -1297,7 +1300,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); @@ -1563,7 +1566,7 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); final CountDownLatch firstLatch = new CountDownLatch(1); final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); @@ -1637,7 +1640,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); final CountDownLatch latch = new CountDownLatch(1); @@ -1726,7 +1729,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite () -> randomAlphaOfLength(10) ); try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); final var latch = new CountDownLatch(1); @@ -1813,7 +1816,7 @@ public void testLogsConnectionResult() throws IOException { var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) ) { - remoteClusterService.listenForUpdates(); + linkedProjectConfigService.register(remoteClusterService); assertThatLogger( () -> clusterSettings.applySettings( @@ -1852,6 +1855,10 @@ public void testLogsConnectionResult() throws IOException { } } + private void initializeRemoteClusters(RemoteClusterService remoteClusterService) { + remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.loadAllLinkedProjectConfigs()); + } + private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); builder.put( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 48c7b2e92e3a0..5ec61adb51ddb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -183,7 +183,7 @@ public Collection createComponents(PluginServices services) { return List.of( ccrLicenseChecker, restoreSourceService, - new CcrRepositoryManager(settings, services.linkedProjectConfigService(), client), + new CcrRepositoryManager(services.linkedProjectConfigService(), client), new ShardFollowTaskCleaner(services.clusterService(), services.threadPool(), client), new AutoFollowCoordinator( settings, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index b81307d7b19f5..ba717abc174da 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -12,10 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.LinkedProjectConfig; import org.elasticsearch.transport.LinkedProjectConfigService; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; @@ -25,17 +22,26 @@ class CcrRepositoryManager extends AbstractLifecycleComponent { private final Client client; - private final RemoteSettingsUpdateListener updateListener; + private final LinkedProjectConfigService linkedProjectConfigService; - CcrRepositoryManager(Settings settings, LinkedProjectConfigService linkedProjectConfigService, Client client) { + CcrRepositoryManager(LinkedProjectConfigService linkedProjectConfigService, Client client) { this.client = client; - updateListener = new RemoteSettingsUpdateListener(settings, linkedProjectConfigService); - updateListener.listenForUpdates(); + this.linkedProjectConfigService = linkedProjectConfigService; + linkedProjectConfigService.register(config -> { + String repositoryName = CcrRepository.NAME_PREFIX + config.linkedProjectAlias(); + if (config.isConnectionEnabled()) { + putRepository(repositoryName); + } else { + deleteRepository(repositoryName); + } + }); } @Override protected void doStart() { - updateListener.init(); + for (var config : linkedProjectConfigService.loadAllLinkedProjectConfigs()) { + putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); + } } @Override @@ -57,27 +63,4 @@ private void deleteRepository(String repositoryName) { client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); assert f.isDone() : "Should be completed as it is executed synchronously"; } - - private class RemoteSettingsUpdateListener extends RemoteClusterAware { - - private RemoteSettingsUpdateListener(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { - super(settings, linkedProjectConfigService); - } - - void init() { - for (var config : loadAllLinkedProjectConfigs()) { - putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); - } - } - - @Override - public void updateLinkedProject(LinkedProjectConfig config) { - String repositoryName = CcrRepository.NAME_PREFIX + config.linkedProjectAlias(); - if (config.isConnectionEnabled()) { - putRepository(repositoryName); - } else { - deleteRepository(repositoryName); - } - } - } } diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java index 2271bae6e27ae..1cf0f869071b5 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java @@ -20,18 +20,17 @@ public final class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; public RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { - super(settings, linkedProjectConfigService); - clusters = new CopyOnWriteArraySet<>(loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList()); - listenForUpdates(); - } - - @Override - public void updateLinkedProject(LinkedProjectConfig config) { - if (config.isConnectionEnabled()) { - clusters.add(config.linkedProjectAlias()); - } else { - clusters.remove(config.linkedProjectAlias()); - } + super(settings); + clusters = new CopyOnWriteArraySet<>( + linkedProjectConfigService.loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + ); + linkedProjectConfigService.register(config -> { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); + } else { + clusters.remove(config.linkedProjectAlias()); + } + }); } public Set remoteClusters() { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index af1cf52703ed1..ba45f4aae432d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -549,20 +549,17 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; private RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { - super(settings, linkedProjectConfigService); + super(settings); clusters = new CopyOnWriteArraySet<>( - loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + linkedProjectConfigService.loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); - listenForUpdates(); - } - - @Override - public void updateLinkedProject(LinkedProjectConfig config) { - if (config.isConnectionEnabled()) { - clusters.add(config.linkedProjectAlias()); - } else { - clusters.remove(config.linkedProjectAlias()); - } + linkedProjectConfigService.register(config -> { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); + } else { + clusters.remove(config.linkedProjectAlias()); + } + }); } ResolvedIndices splitLocalAndRemoteIndexNames(String... indices) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index 87cb0c09de25e..f436d51474eb8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -47,18 +47,17 @@ int numClusters() { } RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { - super(settings, linkedProjectConfigService); - clusters = new CopyOnWriteArraySet<>(loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList()); - listenForUpdates(); - } - - @Override - public void updateLinkedProject(LinkedProjectConfig config) { - if (config.isConnectionEnabled()) { - clusters.add(config.linkedProjectAlias()); - } else { - clusters.remove(config.linkedProjectAlias()); - } + super(settings); + clusters = new CopyOnWriteArraySet<>( + linkedProjectConfigService.loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + ); + linkedProjectConfigService.register(config -> { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); + } else { + clusters.remove(config.linkedProjectAlias()); + } + }); } ResolvedIndices resolve(String... indices) { From b5d40ff4db8a3fad80ca742dcafc5dc71d2e402d Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Fri, 5 Sep 2025 15:45:12 -0400 Subject: [PATCH 05/11] Add FixForMultiProject annotations in ClusterSettingsLinkedProjectConfigService --- .../transport/ClusterSettingsLinkedProjectConfigService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java index 562b169fe14dd..b6cc44ed33eb9 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.FixForMultiProject; import java.util.Collection; import java.util.List; @@ -50,6 +51,7 @@ public ClusterSettingsLinkedProjectConfigService(Settings settings, ClusterSetti } @Override + @FixForMultiProject(description = "Refactor to add the linked project IDs associated with the aliases.") public Collection loadAllLinkedProjectConfigs() { return RemoteClusterSettings.getRemoteClusters(settings) .stream() @@ -57,12 +59,15 @@ public Collection loadAllLinkedProjectConfigs() { .toList(); } + private void settingsChangedCallback(String clusterAlias, Settings newSettings) { final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings); handleUpdate(config); } + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) { handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable); } From d461715d4763da4020bc4842eaf7c2fb1b766d55 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Sat, 6 Sep 2025 10:02:21 -0400 Subject: [PATCH 06/11] Move LinkedProjectConfigService.NOOP to test code --- .../transport/LinkedProjectConfigService.java | 15 --------- .../StubLinkedProjectConfigService.java | 33 +++++++++++++++++++ .../xpack/security/SecurityTests.java | 4 +-- .../xpack/sql/plugin/SqlPluginTests.java | 4 +-- .../TransformCheckpointServiceNodeTests.java | 4 +-- .../DefaultCheckpointProviderTests.java | 4 +-- .../TimeBasedCheckpointProviderTests.java | 4 +-- ...TransformPersistentTasksExecutorTests.java | 4 +-- .../transforms/TransformTaskTests.java | 4 +-- 9 files changed, 47 insertions(+), 29 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java index 22f7d828b0843..4cb4b7927b005 100644 --- a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.metadata.ProjectId; import java.util.Collection; -import java.util.Collections; /** * Service for registering {@link LinkedProjectConfigListener}s to be notified of changes to linked project configurations. @@ -61,18 +60,4 @@ default void skipUnavailableChanged( * @return A collection of all existing {@link LinkedProjectConfig}s. */ Collection loadAllLinkedProjectConfigs(); - - /** - * A no-op stub implementation of {@link LinkedProjectConfigService} intended for use in test scenarios where linked project - * configuration updates are not needed. - */ - LinkedProjectConfigService NOOP = new LinkedProjectConfigService() { - @Override - public void register(LinkedProjectConfigListener listener) {} - - @Override - public Collection loadAllLinkedProjectConfigs() { - return Collections.emptyList(); - } - }; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java new file mode 100644 index 0000000000000..020792701a5fc --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.test.transport; + +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; + +import java.util.Collection; +import java.util.Collections; + +/** + * A no-op stub implementation of {@link LinkedProjectConfigService} intended for use in test scenarios where linked project + * configuration updates are not needed. + */ +public class StubLinkedProjectConfigService implements LinkedProjectConfigService { + + public static final StubLinkedProjectConfigService INSTANCE = new StubLinkedProjectConfigService(); + + @Override + public void register(LinkedProjectConfigListener listener) {} + + @Override + public Collection loadAllLinkedProjectConfigs() { + return Collections.emptyList(); + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index caf131edbe3b6..93d1e15aebd3a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -71,9 +71,9 @@ import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.usage.UsageService; import org.elasticsearch.watcher.ResourceWatcherService; @@ -263,7 +263,7 @@ private Collection createComponentsUtil(Settings settings) throws Except TestIndexNameExpressionResolver.newInstance(threadContext), TelemetryProvider.NOOP, mock(PersistentTasksService.class), - LinkedProjectConfigService.NOOP, + StubLinkedProjectConfigService.INSTANCE, TestProjectResolvers.alwaysThrow() ); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java index e1e8e825ef22e..90439cb7dd491 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.rest.RestController; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.LinkedProjectConfigService; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.xpack.sql.session.Cursors; import java.util.Collections; @@ -34,7 +34,7 @@ public void testSqlDisabledIsNoOp() { mock(Client.class), Settings.EMPTY, randomAlphaOfLength(10), - LinkedProjectConfigService.NOOP, + StubLinkedProjectConfigService.INSTANCE, new NamedWriteableRegistry(Cursors.getNamedWriteables()) ), hasSize(3) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 343d17c5c64e6..a577276ecc08c 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -46,10 +46,10 @@ import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; -import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; @@ -176,7 +176,7 @@ public void createComponents() { transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - LinkedProjectConfigService.NOOP, + StubLinkedProjectConfigService.INSTANCE, transformsConfigManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 38280f68fa5a1..8ed6b49d98753 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -29,9 +29,9 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.MockLog.LoggingExpectation; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; -import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; @@ -467,7 +467,7 @@ private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transfor return new DefaultCheckpointProvider( clock, parentTaskClient, - new RemoteClusterResolver(Settings.EMPTY, LinkedProjectConfigService.NOOP), + new RemoteClusterResolver(Settings.EMPTY, StubLinkedProjectConfigService.INSTANCE), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index 41da13754ac44..edb25933de5c0 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -27,8 +27,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; @@ -287,7 +287,7 @@ private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transf return new TimeBasedCheckpointProvider( clock, parentTaskClient, - new RemoteClusterResolver(Settings.EMPTY, LinkedProjectConfigService.NOOP), + new RemoteClusterResolver(Settings.EMPTY, StubLinkedProjectConfigService.INSTANCE), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index a806c5400193a..5dc19e17a57f7 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -39,9 +39,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; @@ -661,7 +661,7 @@ private TransformServices transformServices(TransformConfigManager configManager var transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - LinkedProjectConfigService.NOOP, + StubLinkedProjectConfigService.INSTANCE, configManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index ac193e410550d..2b9be7097b75c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -30,9 +30,9 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; @@ -196,7 +196,7 @@ private TransformServices transformServices(Clock clock, TransformAuditor audito var transformsCheckpointService = new TransformCheckpointService( clock, Settings.EMPTY, - LinkedProjectConfigService.NOOP, + StubLinkedProjectConfigService.INSTANCE, transformsConfigManager, auditor ); From 0450941b14b46bb1041c0302975115e67edec599 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Sat, 6 Sep 2025 10:08:54 -0400 Subject: [PATCH 07/11] Rename loadAllLinkedProjectConfigs() to getInitialLinkedProjectConfigs() --- .../ClusterSettingsLinkedProjectConfigService.java | 2 +- .../elasticsearch/transport/LinkedProjectConfigService.java | 6 +++--- .../java/org/elasticsearch/transport/TransportService.java | 2 +- .../ClusterSettingsLinkedProjectConfigServiceTests.java | 2 +- .../elasticsearch/transport/RemoteClusterServiceTests.java | 2 +- .../test/transport/StubLinkedProjectConfigService.java | 2 +- .../org/elasticsearch/xpack/ccr/CcrRepositoryManager.java | 2 +- .../elasticsearch/xpack/ql/index/RemoteClusterResolver.java | 2 +- .../xpack/security/authz/IndicesAndAliasesResolver.java | 2 +- .../xpack/transform/checkpoint/RemoteClusterResolver.java | 2 +- 10 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java index b6cc44ed33eb9..d183fa1c560ac 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -52,7 +52,7 @@ public ClusterSettingsLinkedProjectConfigService(Settings settings, ClusterSetti @Override @FixForMultiProject(description = "Refactor to add the linked project IDs associated with the aliases.") - public Collection loadAllLinkedProjectConfigs() { + public Collection getInitialLinkedProjectConfigs() { return RemoteClusterSettings.getRemoteClusters(settings) .stream() .map(alias -> RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, settings)) diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java index 4cb4b7927b005..66c465f9162c2 100644 --- a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java @@ -55,9 +55,9 @@ default void skipUnavailableChanged( void register(LinkedProjectConfigListener listener); /** - * Loads all existing linked project configurations for all origin projects. + * Loads all linked project configurations known at node startup, for all origin projects. * - * @return A collection of all existing {@link LinkedProjectConfig}s. + * @return A collection of all known {@link LinkedProjectConfig}s at node startup. */ - Collection loadAllLinkedProjectConfigs(); + Collection getInitialLinkedProjectConfigs(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 0995eed7da181..498cc076ed608 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -373,7 +373,7 @@ protected void doStart() { if (remoteClusterClient) { // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.loadAllLinkedProjectConfigs()); + remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs()); } } diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java index 5ed3c9467813b..cbf248762a75f 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java @@ -79,7 +79,7 @@ public void testListenersReceiveUpdates() { final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress).build(); // Verify we can get the linked projects on startup. - assertThat(service.loadAllLinkedProjectConfigs(), equalTo(List.of(config))); + assertThat(service.getInitialLinkedProjectConfigs(), equalTo(List.of(config))); final int numListeners = randomIntBetween(1, 10); final var listeners = new ArrayList(numListeners); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index f7d1735c174b9..79d8b42ad10b0 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -1856,7 +1856,7 @@ public void testLogsConnectionResult() throws IOException { } private void initializeRemoteClusters(RemoteClusterService remoteClusterService) { - remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.loadAllLinkedProjectConfigs()); + remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs()); } private static Settings createSettings(String clusterAlias, List seeds) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java index 020792701a5fc..6937083f0160e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java @@ -27,7 +27,7 @@ public class StubLinkedProjectConfigService implements LinkedProjectConfigServic public void register(LinkedProjectConfigListener listener) {} @Override - public Collection loadAllLinkedProjectConfigs() { + public Collection getInitialLinkedProjectConfigs() { return Collections.emptyList(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index ba717abc174da..38da013bc03f4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -39,7 +39,7 @@ class CcrRepositoryManager extends AbstractLifecycleComponent { @Override protected void doStart() { - for (var config : linkedProjectConfigService.loadAllLinkedProjectConfigs()) { + for (var config : linkedProjectConfigService.getInitialLinkedProjectConfigs()) { putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java index 1cf0f869071b5..8f6feed6875e5 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java @@ -22,7 +22,7 @@ public final class RemoteClusterResolver extends RemoteClusterAware { public RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { super(settings); clusters = new CopyOnWriteArraySet<>( - linkedProjectConfigService.loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); linkedProjectConfigService.register(config -> { if (config.isConnectionEnabled()) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index ba45f4aae432d..0924360547579 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -551,7 +551,7 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { super(settings); clusters = new CopyOnWriteArraySet<>( - linkedProjectConfigService.loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); linkedProjectConfigService.register(config -> { if (config.isConnectionEnabled()) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index f436d51474eb8..5379efe3298a8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -49,7 +49,7 @@ int numClusters() { RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { super(settings); clusters = new CopyOnWriteArraySet<>( - linkedProjectConfigService.loadAllLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); linkedProjectConfigService.register(config -> { if (config.isConnectionEnabled()) { From 20861a3ac01bad965b4172966847b9cc774ca408 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Sat, 6 Sep 2025 10:25:53 -0400 Subject: [PATCH 08/11] Remove redundant legacy alias check --- .../java/org/elasticsearch/transport/RemoteClusterService.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 2cba4bae9c05f..014eb22775778 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -374,9 +374,6 @@ public void onFailure(Exception e) { @Override public void updateLinkedProject(LinkedProjectConfig config) { - if (LOCAL_CLUSTER_GROUP_KEY.equals(config.linkedProjectAlias())) { - throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); - } final var projectId = config.originProjectId(); final var clusterAlias = config.linkedProjectAlias(); CountDownLatch latch = new CountDownLatch(1); From 716d6b6fb7b7135a2e5cffa861f73165885f7363 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Sat, 6 Sep 2025 11:35:20 -0400 Subject: [PATCH 09/11] Add optional support for registering ClusterSettings update consumers --- ...terSettingsLinkedProjectConfigService.java | 51 ++++++++++++------- .../transport/TransportService.java | 12 ++--- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java index d183fa1c560ac..4fa8619f8dbe3 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.Nullable; import java.util.Collection; import java.util.List; @@ -27,27 +28,40 @@ public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedPro private final Settings settings; private final ProjectResolver projectResolver; + /** + * Constructs a new {@link ClusterSettingsLinkedProjectConfigService}. + * + * @param settings The initial node settings available on startup, used in {@link #getInitialLinkedProjectConfigs()}. + * @param clusterSettings The {@link ClusterSettings} to add setting update consumers to, if non-null. + * @param projectResolver The {@link ProjectResolver} to use to resolve the origin project ID. + */ @SuppressWarnings("this-escape") - public ClusterSettingsLinkedProjectConfigService(Settings settings, ClusterSettings clusterSettings, ProjectResolver projectResolver) { + public ClusterSettingsLinkedProjectConfigService( + Settings settings, + @Nullable ClusterSettings clusterSettings, + ProjectResolver projectResolver + ) { this.settings = settings; this.projectResolver = projectResolver; - List> remoteClusterSettings = List.of( - RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, - RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, - RemoteClusterSettings.REMOTE_CONNECTION_MODE, - RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, - RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, - RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, - RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS, - RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS, - RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME - ); - clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback); - clusterSettings.addAffixUpdateConsumer( - RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - this::skipUnavailableChangedCallback, - (alias, value) -> {} - ); + if (clusterSettings != null) { + List> remoteClusterSettings = List.of( + RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, + RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteClusterSettings.REMOTE_CONNECTION_MODE, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, + RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS, + RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS, + RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME + ); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback); + clusterSettings.addAffixUpdateConsumer( + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, + this::skipUnavailableChangedCallback, + (alias, value) -> {} + ); + } } @Override @@ -59,7 +73,6 @@ public Collection getInitialLinkedProjectConfigs() { .toList(); } - private void settingsChangedCallback(String clusterAlias, Settings newSettings) { final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 498cc076ed608..785a56a41ddbe 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -276,11 +276,7 @@ public TransportService( clusterSettings, connectionManager, taskManger, - new ClusterSettingsLinkedProjectConfigService( - settings, - clusterSettings != null ? clusterSettings : ClusterSettings.createBuiltInClusterSettings(), - DefaultProjectResolver.INSTANCE - ), + new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), DefaultProjectResolver.INSTANCE ); } @@ -317,11 +313,11 @@ public TransportService( if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); - if (remoteClusterClient) { - linkedProjectConfigService.register(remoteClusterService); - } clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } + if (remoteClusterClient) { + linkedProjectConfigService.register(remoteClusterService); + } registerRequestHandler( HANDSHAKE_ACTION_NAME, EsExecutors.DIRECT_EXECUTOR_SERVICE, From af28735364887896aada77109ae4bfb91690fdb9 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Tue, 9 Sep 2025 12:07:46 -0400 Subject: [PATCH 10/11] RemoteClusterAware implements LinkedProjectConfigListener --- .../transport/RemoteClusterAware.java | 2 +- .../transport/RemoteClusterService.java | 3 +- .../transport/RemoteClusterAwareTests.java | 3 ++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../xpack/ccr/CcrRepositoryManager.java | 43 +++++++++++++------ .../xpack/ql/index/RemoteClusterResolver.java | 17 +++++--- .../authz/IndicesAndAliasesResolver.java | 17 +++++--- .../checkpoint/RemoteClusterResolver.java | 17 +++++--- 8 files changed, 67 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index f0499a7d50780..50e1be5fdba48 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -29,7 +29,7 @@ /** * Base class for services and components that utilize linked projects. */ -public abstract class RemoteClusterAware { +public abstract class RemoteClusterAware implements LinkedProjectConfigService.LinkedProjectConfigListener { public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 014eb22775778..b1e07e2994e11 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -63,8 +63,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Closeable, ReportingService, - IndicesExpressionGrouper, - LinkedProjectConfigService.LinkedProjectConfigListener { + IndicesExpressionGrouper { private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java index 641bc499e8f37..073b5bf0b4b52 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java @@ -163,6 +163,9 @@ private static class RemoteClusterAwareTest extends RemoteClusterAware { super(Settings.EMPTY); } + @Override + public void updateLinkedProject(LinkedProjectConfig config) {} + @Override public Map> groupClusterIndices(Set remoteClusterNames, String[] requestIndices) { return super.groupClusterIndices(remoteClusterNames, requestIndices); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 5ec61adb51ddb..48c7b2e92e3a0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -183,7 +183,7 @@ public Collection createComponents(PluginServices services) { return List.of( ccrLicenseChecker, restoreSourceService, - new CcrRepositoryManager(services.linkedProjectConfigService(), client), + new CcrRepositoryManager(settings, services.linkedProjectConfigService(), client), new ShardFollowTaskCleaner(services.clusterService(), services.threadPool(), client), new AutoFollowCoordinator( settings, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index 38da013bc03f4..ca39f5a8b3cf0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -12,7 +12,10 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; import org.elasticsearch.transport.LinkedProjectConfigService; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; @@ -22,26 +25,19 @@ class CcrRepositoryManager extends AbstractLifecycleComponent { private final Client client; + private final RemoteSettingsUpdateListener updateListener; private final LinkedProjectConfigService linkedProjectConfigService; - CcrRepositoryManager(LinkedProjectConfigService linkedProjectConfigService, Client client) { + CcrRepositoryManager(Settings settings, LinkedProjectConfigService linkedProjectConfigService, Client client) { this.client = client; + updateListener = new RemoteSettingsUpdateListener(settings); + linkedProjectConfigService.register(updateListener); this.linkedProjectConfigService = linkedProjectConfigService; - linkedProjectConfigService.register(config -> { - String repositoryName = CcrRepository.NAME_PREFIX + config.linkedProjectAlias(); - if (config.isConnectionEnabled()) { - putRepository(repositoryName); - } else { - deleteRepository(repositoryName); - } - }); } @Override protected void doStart() { - for (var config : linkedProjectConfigService.getInitialLinkedProjectConfigs()) { - putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); - } + updateListener.init(); } @Override @@ -63,4 +59,27 @@ private void deleteRepository(String repositoryName) { client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); assert f.isDone() : "Should be completed as it is executed synchronously"; } + + private class RemoteSettingsUpdateListener extends RemoteClusterAware { + + private RemoteSettingsUpdateListener(Settings settings) { + super(settings); + } + + void init() { + for (var config : linkedProjectConfigService.getInitialLinkedProjectConfigs()) { + putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); + } + } + + @Override + public void updateLinkedProject(LinkedProjectConfig config) { + String repositoryName = CcrRepository.NAME_PREFIX + config.linkedProjectAlias(); + if (config.isConnectionEnabled()) { + putRepository(repositoryName); + } else { + deleteRepository(repositoryName); + } + } + } } diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java index 8f6feed6875e5..e4dd5dc7b6593 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java @@ -24,13 +24,16 @@ public RemoteClusterResolver(Settings settings, LinkedProjectConfigService linke clusters = new CopyOnWriteArraySet<>( linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); - linkedProjectConfigService.register(config -> { - if (config.isConnectionEnabled()) { - clusters.add(config.linkedProjectAlias()); - } else { - clusters.remove(config.linkedProjectAlias()); - } - }); + linkedProjectConfigService.register(this); + } + + @Override + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); + } else { + clusters.remove(config.linkedProjectAlias()); + } } public Set remoteClusters() { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 0924360547579..8ff69db0b2cf8 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -553,13 +553,16 @@ private RemoteClusterResolver(Settings settings, LinkedProjectConfigService link clusters = new CopyOnWriteArraySet<>( linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); - linkedProjectConfigService.register(config -> { - if (config.isConnectionEnabled()) { - clusters.add(config.linkedProjectAlias()); - } else { - clusters.remove(config.linkedProjectAlias()); - } - }); + linkedProjectConfigService.register(this); + } + + @Override + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); + } else { + clusters.remove(config.linkedProjectAlias()); + } } ResolvedIndices splitLocalAndRemoteIndexNames(String... indices) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index 5379efe3298a8..ccaa004ac8bd2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -51,13 +51,16 @@ int numClusters() { clusters = new CopyOnWriteArraySet<>( linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() ); - linkedProjectConfigService.register(config -> { - if (config.isConnectionEnabled()) { - clusters.add(config.linkedProjectAlias()); - } else { - clusters.remove(config.linkedProjectAlias()); - } - }); + linkedProjectConfigService.register(this); + } + + @Override + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); + } else { + clusters.remove(config.linkedProjectAlias()); + } } ResolvedIndices resolve(String... indices) { From 68fa0fdc23ad899875af7173fd250c6b11928cee Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Tue, 9 Sep 2025 12:25:03 -0400 Subject: [PATCH 11/11] Remove conditional creation of linkedProjectConfigService --- .../transport/RemoteClusterServiceTests.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 79d8b42ad10b0..e5a343b984785 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -84,13 +84,11 @@ private RemoteClusterService createRemoteClusterService( ClusterSettings clusterSettings, MockTransportService transportService ) { - if (linkedProjectConfigService == null) { - linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService( - settings, - clusterSettings, - DefaultProjectResolver.INSTANCE - ); - } + linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService( + settings, + clusterSettings, + DefaultProjectResolver.INSTANCE + ); return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE); }