diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index c909c9a25e5b8..d202dbd69527f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -223,6 +223,8 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; +import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.usage.UsageService; @@ -969,6 +971,11 @@ public Map queryFields() { final IndexingPressure indexingLimits = new IndexingPressure(settings); + final var linkedProjectConfigService = pluginsService.loadSingletonServiceProvider( + LinkedProjectConfigService.class, + () -> new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver) + ); + PluginServiceInstances pluginServices = new PluginServiceInstances( client, clusterService, @@ -992,7 +999,8 @@ public Map queryFields() { taskManager, projectResolver, slowLogFieldProvider, - indexingLimits + indexingLimits, + linkedProjectConfigService ); Collection pluginComponents = pluginsService.flatMap(plugin -> { @@ -1122,6 +1130,7 @@ public Map queryFields() { taskManager, telemetryProvider.getTracer(), nodeEnvironment.nodeId(), + linkedProjectConfigService, projectResolver ); final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry()); diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 5f1dc11ea16e1..0b8c59b3c1957 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -43,6 +43,7 @@ import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -121,6 +122,7 @@ TransportService newTransportService( TaskManager taskManager, Tracer tracer, String nodeId, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { return new TransportService( @@ -132,6 +134,7 @@ TransportService newTransportService( clusterSettings, new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), taskManager, + linkedProjectConfigService, projectResolver ); } diff --git a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java index ec58555b70505..601b768b01277 100644 --- a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java +++ b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java @@ -31,6 +31,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -57,5 +58,6 @@ public record PluginServiceInstances( TaskManager taskManager, ProjectResolver projectResolver, SlowLogFieldProvider slowLogFieldProvider, - IndexingPressure indexingPressure + IndexingPressure indexingPressure, + LinkedProjectConfigService linkedProjectConfigService ) implements Plugin.PluginServices {} diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java index 955acbe55768c..c14f7f4914983 100644 --- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -40,6 +40,7 @@ import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentParser; @@ -192,6 +193,11 @@ public interface PluginServices { * Provider for indexing pressure */ IndexingPressure indexingPressure(); + + /** + * A service for registering for linked project configuration updates. + */ + LinkedProjectConfigService linkedProjectConfigService(); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java new file mode 100644 index 0000000000000..3cb06f7da007e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Abstract base class for {@link LinkedProjectConfigService} implementations. + * Provides common functionality for managing a list of registered listeners and notifying them of updates. + */ +public abstract class AbstractLinkedProjectConfigService implements LinkedProjectConfigService { + private final List listeners = new CopyOnWriteArrayList<>(); + + @Override + public void register(LinkedProjectConfigListener listener) { + listeners.add(listener); + } + + protected void handleUpdate(LinkedProjectConfig config) { + listeners.forEach(listener -> listener.updateLinkedProject(config)); + } + + protected void handleSkipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) { + listeners.forEach( + listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java new file mode 100644 index 0000000000000..4fa8619f8dbe3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.Nullable; + +import java.util.Collection; +import java.util.List; + +/** + * A {@link LinkedProjectConfigService} implementation that listens for {@link ClusterSettings} changes, + * creating {@link LinkedProjectConfig}s from the relevant settings and notifying registered listeners of updates. + */ +public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedProjectConfigService { + private final Settings settings; + private final ProjectResolver projectResolver; + + /** + * Constructs a new {@link ClusterSettingsLinkedProjectConfigService}. + * + * @param settings The initial node settings available on startup, used in {@link #getInitialLinkedProjectConfigs()}. + * @param clusterSettings The {@link ClusterSettings} to add setting update consumers to, if non-null. + * @param projectResolver The {@link ProjectResolver} to use to resolve the origin project ID. + */ + @SuppressWarnings("this-escape") + public ClusterSettingsLinkedProjectConfigService( + Settings settings, + @Nullable ClusterSettings clusterSettings, + ProjectResolver projectResolver + ) { + this.settings = settings; + this.projectResolver = projectResolver; + if (clusterSettings != null) { + List> remoteClusterSettings = List.of( + RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, + RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteClusterSettings.REMOTE_CONNECTION_MODE, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, + RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, + RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS, + RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS, + RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME + ); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback); + clusterSettings.addAffixUpdateConsumer( + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, + this::skipUnavailableChangedCallback, + (alias, value) -> {} + ); + } + } + + @Override + @FixForMultiProject(description = "Refactor to add the linked project IDs associated with the aliases.") + public Collection getInitialLinkedProjectConfigs() { + return RemoteClusterSettings.getRemoteClusters(settings) + .stream() + .map(alias -> RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, settings)) + .toList(); + } + + private void settingsChangedCallback(String clusterAlias, Settings newSettings) { + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") + final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings); + handleUpdate(config); + } + + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") + private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) { + handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java new file mode 100644 index 0000000000000..66c465f9162c2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; + +import java.util.Collection; + +/** + * Service for registering {@link LinkedProjectConfigListener}s to be notified of changes to linked project configurations. + */ +public interface LinkedProjectConfigService { + + /** + * Listener interface for receiving updates about linked project configurations. + * Implementations must not throw from any of the interface methods. + */ + interface LinkedProjectConfigListener { + /** + * Called when a linked project configuration has been added or updated. + * + * @param config The updated {@link LinkedProjectConfig}. + */ + void updateLinkedProject(LinkedProjectConfig config); + + /** + * Called when the boolean skip_unavailable setting has changed for a linked project configuration. + * Note that skip_unavailable may not be supported in all contexts where linked projects are used. + * + * @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration. + * @param linkedProjectId The {@link ProjectId} of the linked project. + * @param linkedProjectAlias The alias used for the linked project. + * @param skipUnavailable The new value of the skip_unavailable setting. + */ + default void skipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) {} + } + + /** + * Registers a {@link LinkedProjectConfigListener} to receive updates about linked project configurations. + * + * @param listener The listener to register. + */ + void register(LinkedProjectConfigListener listener); + + /** + * Loads all linked project configurations known at node startup, for all origin projects. + * + * @return A collection of all known {@link LinkedProjectConfig}s at node startup. + */ + Collection getInitialLinkedProjectConfigs(); +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 1d6cd8ea353ef..50e1be5fdba48 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -13,8 +13,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; @@ -28,13 +26,10 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings; -import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings; - /** - * Base class for all services and components that need up-to-date information about the registered remote clusters + * Base class for services and components that utilize linked projects. */ -public abstract class RemoteClusterAware { +public abstract class RemoteClusterAware implements LinkedProjectConfigService.LinkedProjectConfigListener { public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; @@ -56,13 +51,6 @@ protected String getNodeName() { return nodeName; } - /** - * Returns remote clusters that are enabled in these settings - */ - protected static Set getEnabledRemoteClusters(final Settings settings) { - return RemoteClusterSettings.getRemoteClusters(settings); - } - /** * Check whether the index expression represents remote index or not. * The index name is assumed to be individual index (no commas) but can contain `-`, wildcards, @@ -215,36 +203,6 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } - void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); - } - updateRemoteCluster(clusterAlias, settings); - } - - /** - * Subclasses must implement this to receive information about updated cluster aliases. - */ - protected abstract void updateRemoteCluster(String clusterAlias, Settings settings); - - /** - * Registers this instance to listen to updates on the cluster settings. - */ - public void listenForUpdates(ClusterSettings clusterSettings) { - List> remoteClusterSettings = List.of( - RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, - RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, - RemoteClusterSettings.REMOTE_CONNECTION_MODE, - SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, - SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, - SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, - ProxyConnectionStrategySettings.PROXY_ADDRESS, - ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategySettings.SERVER_NAME - ); - clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); - } - public static String buildRemoteIndexName(String clusterAlias, String indexName) { return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) ? indexName diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index ac3c8a74af37c..b1e07e2994e11 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -39,6 +38,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -286,17 +286,13 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) { } @Override - public void listenForUpdates(ClusterSettings clusterSettings) { - super.listenForUpdates(clusterSettings); - clusterSettings.addAffixUpdateConsumer( - RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - this::updateSkipUnavailable, - (alias, value) -> {} - ); - } - - private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { - RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias); + public void skipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) { + final var remote = getConnectionsMapForProject(originProjectId).get(linkedProjectAlias); if (remote != null) { remote.setSkipUnavailable(skipUnavailable); } @@ -329,7 +325,7 @@ public synchronized void updateRemoteClusterCredentials(Supplier setti private void maybeRebuildConnectionOnCredentialsChange( ProjectId projectId, String clusterAlias, - Settings settings, + Settings newSettings, RefCountingRunnable connectionRefs ) { final var connectionsMap = getConnectionsMapForProject(projectId); @@ -344,7 +340,9 @@ private void maybeRebuildConnectionOnCredentialsChange( return; } - updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + final var config = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings); + updateRemoteCluster(config, true, ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { logger.info( @@ -374,11 +372,11 @@ public void onFailure(Exception e) { } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - @FixForMultiProject(description = "ES-12270: Refactor as needed to support project specific changes to linked remotes.") - final var projectId = projectResolver.getProjectId(); + public void updateLinkedProject(LinkedProjectConfig config) { + final var projectId = config.originProjectId(); + final var clusterAlias = config.linkedProjectAlias(); CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() { + updateRemoteCluster(config, false, ActionListener.runAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { logger.info("project [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status); @@ -410,34 +408,19 @@ public void onFailure(Exception e) { // Package-access for testing. @FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.") void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener listener) { - updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener); + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") + final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings); + updateRemoteCluster(config, false, listener); } /** * Adds, rebuilds, or closes and removes the connection for the specified remote cluster. * - * @param projectId The project the remote cluster is associated with. - * @param clusterAlias The alias used for the remote cluster being connected. - * @param newSettings The updated settings for the remote connection. + * @param config The linked project configuration. * @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it. * @param listener The listener invoked once the configured cluster has been connected. */ - private synchronized void updateRemoteCluster( - ProjectId projectId, - String clusterAlias, - Settings newSettings, - boolean forceRebuild, - ActionListener listener - ) { - if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); - } - final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); - @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") - final var linkedProjectConfig = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings); - updateRemoteCluster(linkedProjectConfig, forceRebuild, listener); - } - private synchronized void updateRemoteCluster( LinkedProjectConfig config, boolean forceRebuild, @@ -491,26 +474,18 @@ enum RemoteClusterConnectionStatus { * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection * to all configured seed nodes. */ - void initializeRemoteClusters() { + void initializeRemoteClusters(Collection configs) { + if (configs.isEmpty()) { + return; + } + @FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.") final var projectId = projectResolver.getProjectId(); final TimeValue timeValue = RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); - - if (enabledClusters.isEmpty()) { - return; - } - - CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future); - for (String clusterAlias : enabledClusters) { - @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") - final var linkedProjectConfig = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, settings); - updateRemoteCluster(linkedProjectConfig, false, listener.map(ignored -> null)); - } - - if (enabledClusters.isEmpty()) { - future.onResponse(null); + CountDownActionListener listener = new CountDownActionListener(configs.size(), future); + for (LinkedProjectConfig config : configs) { + updateRemoteCluster(config, false, listener.map(ignored -> null)); } try { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java index f5b72bf6e40b8..3c2b02e8533ce 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java @@ -337,6 +337,9 @@ public static LinkedProjectConfig toConfig( String linkedProjectAlias, Settings settings ) { + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(linkedProjectAlias)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } final var strategy = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(linkedProjectAlias).get(settings); final var builder = switch (strategy) { case SNIFF -> SniffConnectionStrategySettings.readSettings( diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 086bef3093dc6..785a56a41ddbe 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -139,6 +139,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { volatile String[] tracerLogInclude; volatile String[] tracerLogExclude; + private final LinkedProjectConfigService linkedProjectConfigService; private final RemoteClusterService remoteClusterService; /** @@ -275,6 +276,7 @@ public TransportService( clusterSettings, connectionManager, taskManger, + new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), DefaultProjectResolver.INSTANCE ); } @@ -289,6 +291,7 @@ public TransportService( @Nullable ClusterSettings clusterSettings, ConnectionManager connectionManager, TaskManager taskManger, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { this.transport = transport; @@ -304,16 +307,17 @@ public TransportService( this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings); + this.linkedProjectConfigService = linkedProjectConfigService; remoteClusterService = new RemoteClusterService(settings, this, projectResolver); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); - if (remoteClusterClient) { - remoteClusterService.listenForUpdates(clusterSettings); - } clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } + if (remoteClusterClient) { + linkedProjectConfigService.register(remoteClusterService); + } registerRequestHandler( HANDSHAKE_ACTION_NAME, EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -365,7 +369,7 @@ protected void doStart() { if (remoteClusterClient) { // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); + remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs()); } } diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java new file mode 100644 index 0000000000000..cbf248762a75f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.DefaultProjectResolver; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.transport.LinkedProjectConfig.ProxyLinkedProjectConfigBuilder; +import static org.elasticsearch.transport.LinkedProjectConfigService.LinkedProjectConfigListener; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class ClusterSettingsLinkedProjectConfigServiceTests extends ESTestCase { + + private record SkipUnavailableUpdate( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) {} + + private static class StubLinkedProjectConfigListener implements LinkedProjectConfigListener { + LinkedProjectConfig updatedConfig; + SkipUnavailableUpdate skipUnavailableUpdate; + + @Override + public void updateLinkedProject(LinkedProjectConfig config) { + updatedConfig = config; + } + + @Override + public void skipUnavailableChanged( + ProjectId originProjectId, + ProjectId linkedProjectId, + String linkedProjectAlias, + boolean skipUnavailable + ) { + skipUnavailableUpdate = new SkipUnavailableUpdate(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable); + } + + void reset() { + updatedConfig = null; + skipUnavailableUpdate = null; + } + } + + /** + * A simple test to exercise the callback registration and notification mechanism. + * Note that {@link RemoteClusterServiceTests} uses {@link ClusterSettingsLinkedProjectConfigService} + * and contains more thorough tests of all the settings being monitored. + */ + public void testListenersReceiveUpdates() { + final var alias = randomAlphaOfLength(10); + + final var initialProxyAddress = "localhost:9400"; + final var initialSettings = Settings.builder() + .put("cluster.remote." + alias + ".mode", "proxy") + .put("cluster.remote." + alias + ".proxy_address", initialProxyAddress) + .build(); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(initialSettings); + final var service = new ClusterSettingsLinkedProjectConfigService( + initialSettings, + clusterSettings, + DefaultProjectResolver.INSTANCE + ); + final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress).build(); + + // Verify we can get the linked projects on startup. + assertThat(service.getInitialLinkedProjectConfigs(), equalTo(List.of(config))); + + final int numListeners = randomIntBetween(1, 10); + final var listeners = new ArrayList(numListeners); + for (int i = 0; i < numListeners; ++i) { + listeners.add(new StubLinkedProjectConfigListener()); + service.register(listeners.getLast()); + } + + // Expect no updates when applying the same settings. + clusterSettings.applySettings(initialSettings); + for (int i = 0; i < numListeners; ++i) { + assertThat(listeners.get(i).updatedConfig, sameInstance(null)); + assertThat(listeners.get(i).skipUnavailableUpdate, sameInstance(null)); + listeners.get(i).reset(); + } + + // Change the skip_unavailable, leave the other settings alone, we should get the skip_unavailable update only. + var expectedSkipUnavailableUpdate = new SkipUnavailableUpdate( + config.originProjectId(), + config.linkedProjectId(), + config.linkedProjectAlias(), + config.skipUnavailable() == false + ); + clusterSettings.applySettings( + Settings.builder() + .put(initialSettings) + .put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable) + .build() + ); + for (int i = 0; i < numListeners; ++i) { + assertThat(listeners.get(i).updatedConfig, sameInstance(null)); + assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate)); + listeners.get(i).reset(); + } + + // Change the proxy address, and set skip_unavailable back to original value, we should get both updates. + expectedSkipUnavailableUpdate = new SkipUnavailableUpdate( + config.originProjectId(), + config.linkedProjectId(), + config.linkedProjectAlias(), + config.skipUnavailable() + ); + final var newProxyAddress = "localhost:9401"; + clusterSettings.applySettings( + Settings.builder() + .put(initialSettings) + .put("cluster.remote." + alias + ".proxy_address", newProxyAddress) + .put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable) + .build() + ); + for (int i = 0; i < numListeners; ++i) { + assertNotNull("expected non-null updatedConfig for listener " + i, listeners.get(i).updatedConfig); + assertThat(listeners.get(i).updatedConfig.proxyAddress(), equalTo(newProxyAddress)); + assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java index 2394e0b07cc57..073b5bf0b4b52 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareTests.java @@ -164,9 +164,7 @@ private static class RemoteClusterAwareTest extends RemoteClusterAware { } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - - } + public void updateLinkedProject(LinkedProjectConfig config) {} @Override public Map> groupClusterIndices(Set remoteClusterNames, String[] requestIndices) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index c4fd4b20ac48c..e5a343b984785 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -58,6 +58,7 @@ import static org.elasticsearch.test.NodeRoles.removeRoles; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings; import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings; +import static org.elasticsearch.transport.RemoteClusterSettings.toConfig; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; @@ -66,6 +67,7 @@ public class RemoteClusterServiceTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private LinkedProjectConfigService linkedProjectConfigService = null; @Override public void tearDown() throws Exception { @@ -73,7 +75,20 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - private RemoteClusterService createRemoteClusterService(final Settings settings, final MockTransportService transportService) { + private RemoteClusterService createRemoteClusterService(Settings settings, MockTransportService transportService) { + return createRemoteClusterService(settings, ClusterSettings.createBuiltInClusterSettings(), transportService); + } + + private RemoteClusterService createRemoteClusterService( + Settings settings, + ClusterSettings clusterSettings, + MockTransportService transportService + ) { + linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService( + settings, + clusterSettings, + DefaultProjectResolver.INSTANCE + ); return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE); } @@ -173,7 +188,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); @@ -387,7 +402,7 @@ public void testGroupIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); @@ -494,7 +509,7 @@ public void testIncrementallyAddClusters() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); Settings cluster1Settings = createSettings( "cluster_1", @@ -505,7 +520,7 @@ public void testIncrementallyAddClusters() throws IOException { // connect before returning. new Thread(() -> { try { - service.validateAndUpdateRemoteCluster("cluster_1", cluster1Settings); + service.updateLinkedProject(toConfig("cluster_1", cluster1Settings)); clusterAdded.onResponse(null); } catch (Exception e) { clusterAdded.onFailure(e); @@ -518,16 +533,16 @@ public void testIncrementallyAddClusters() throws IOException { "cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()) ); - service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings); + service.updateLinkedProject(toConfig("cluster_2", cluster2Settings)); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); assertTrue(isRemoteClusterRegistered(service, "cluster_2")); Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); - service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled); + service.updateLinkedProject(toConfig("cluster_2", cluster2SettingsDisabled)); assertFalse(isRemoteClusterRegistered(service, "cluster_2")); IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, - () -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY) + () -> service.updateLinkedProject(toConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)) ); assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } @@ -570,12 +585,11 @@ public void testDefaultPingSchedule() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); - service.validateAndUpdateRemoteCluster( - "cluster_1", - createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())) - ); + final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + service.updateLinkedProject(toConfig("cluster_1", mergedSettings)); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); @@ -633,7 +647,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -672,7 +686,7 @@ public void testChangeSettings() throws Exception { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); @@ -685,7 +699,7 @@ public void testChangeSettings() throws Exception { settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); } settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build()); + service.updateLinkedProject(toConfig("cluster_1", settingsChange.build())); assertBusy(remoteClusterConnection::isClosed); remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); @@ -758,7 +772,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -848,7 +862,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -943,7 +957,7 @@ public void testCollectNodes() throws InterruptedException, IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -1096,7 +1110,7 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(createSettings("cluster_1", seedList), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); final var taskLatch = new CountDownLatch(numTasks); @@ -1284,7 +1298,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(hasRegisteredClusters(service)); final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); @@ -1550,7 +1564,7 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); final CountDownLatch firstLatch = new CountDownLatch(1); final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); @@ -1624,7 +1638,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi transportService.acceptIncomingRequests(); try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); final CountDownLatch latch = new CountDownLatch(1); @@ -1713,7 +1727,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite () -> randomAlphaOfLength(10) ); try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); final var latch = new CountDownLatch(1); @@ -1794,14 +1808,13 @@ private Settings buildRemoteClusterSettings(String clusterAlias, String address) } public void testLogsConnectionResult() throws IOException { - + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); try ( var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var remoteClusterService = createRemoteClusterService(Settings.EMPTY, local) + var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) ) { - var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); - remoteClusterService.listenForUpdates(clusterSettings); + linkedProjectConfigService.register(remoteClusterService); assertThatLogger( () -> clusterSettings.applySettings( @@ -1840,6 +1853,10 @@ public void testLogsConnectionResult() throws IOException { } } + private void initializeRemoteClusters(RemoteClusterService remoteClusterService) { + remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs()); + } + private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); builder.put( diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index a6ab3699eb206..e8fc4d4a539b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -50,6 +50,7 @@ import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -174,6 +175,7 @@ protected TransportService newTransportService( TaskManager taskManager, Tracer tracer, String nodeId, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { @@ -193,6 +195,7 @@ protected TransportService newTransportService( taskManager, tracer, nodeId, + linkedProjectConfigService, projectResolver ); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java new file mode 100644 index 0000000000000..6937083f0160e --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubLinkedProjectConfigService.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.test.transport; + +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; + +import java.util.Collection; +import java.util.Collections; + +/** + * A no-op stub implementation of {@link LinkedProjectConfigService} intended for use in test scenarios where linked project + * configuration updates are not needed. + */ +public class StubLinkedProjectConfigService implements LinkedProjectConfigService { + + public static final StubLinkedProjectConfigService INSTANCE = new StubLinkedProjectConfigService(); + + @Override + public void register(LinkedProjectConfigListener listener) {} + + @Override + public Collection getInitialLinkedProjectConfigs() { + return Collections.emptyList(); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index bfc85f3efb0fe..48c7b2e92e3a0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -183,7 +183,7 @@ public Collection createComponents(PluginServices services) { return List.of( ccrLicenseChecker, restoreSourceService, - new CcrRepositoryManager(settings, services.clusterService(), client), + new CcrRepositoryManager(settings, services.linkedProjectConfigService(), client), new ShardFollowTaskCleaner(services.clusterService(), services.threadPool(), client), new AutoFollowCoordinator( settings, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index a37f75daaa6e5..ca39f5a8b3cf0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -11,28 +11,28 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.repository.CcrRepository; -import java.util.Set; - class CcrRepositoryManager extends AbstractLifecycleComponent { private final Client client; private final RemoteSettingsUpdateListener updateListener; + private final LinkedProjectConfigService linkedProjectConfigService; - CcrRepositoryManager(Settings settings, ClusterService clusterService, Client client) { + CcrRepositoryManager(Settings settings, LinkedProjectConfigService linkedProjectConfigService, Client client) { this.client = client; updateListener = new RemoteSettingsUpdateListener(settings); - updateListener.listenForUpdates(clusterService.getClusterSettings()); + linkedProjectConfigService.register(updateListener); + this.linkedProjectConfigService = linkedProjectConfigService; } @Override @@ -67,16 +67,15 @@ private RemoteSettingsUpdateListener(Settings settings) { } void init() { - Set clusterAliases = getEnabledRemoteClusters(settings); - for (String clusterAlias : clusterAliases) { - putRepository(CcrRepository.NAME_PREFIX + clusterAlias); + for (var config : linkedProjectConfigService.getInitialLinkedProjectConfigs()) { + putRepository(CcrRepository.NAME_PREFIX + config.linkedProjectAlias()); } } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { + public void updateLinkedProject(LinkedProjectConfig config) { + String repositoryName = CcrRepository.NAME_PREFIX + config.linkedProjectAlias(); + if (config.isConnectionEnabled()) { putRepository(repositoryName); } else { deleteRepository(repositoryName); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index 4ef003fb84381..1b0f71debddf0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -10,7 +10,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; @@ -27,6 +26,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -76,14 +76,24 @@ public EqlPlugin() {} @Override public Collection createComponents(PluginServices services) { - return createComponents(services.client(), services.environment().settings(), services.clusterService()); + return createComponents( + services.client(), + services.environment().settings(), + services.clusterService().getClusterName().value(), + services.linkedProjectConfigService() + ); } - private Collection createComponents(Client client, Settings settings, ClusterService clusterService) { - RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + private Collection createComponents( + Client client, + Settings settings, + String clusterName, + LinkedProjectConfigService linkedProjectConfigService + ) { + RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); IndexResolver indexResolver = new IndexResolver( client, - clusterService.getClusterName().value(), + clusterName, DefaultDataTypeRegistry.INSTANCE, remoteClusterResolver::remoteClusters ); diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java index 7a815bc09cf9b..e4dd5dc7b6593 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/RemoteClusterResolver.java @@ -7,10 +7,10 @@ package org.elasticsearch.xpack.ql.index; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import java.util.Set; import java.util.TreeSet; @@ -19,18 +19,20 @@ public final class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; - public RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { + public RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { super(settings); - clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); - listenForUpdates(clusterSettings); + clusters = new CopyOnWriteArraySet<>( + linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + ); + linkedProjectConfigService.register(this); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 20de24ef36cbc..6fa86404d8eb5 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -107,6 +107,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterSettings; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; @@ -749,6 +750,7 @@ public Collection createComponents(PluginServices services) { services.indexNameExpressionResolver(), services.telemetryProvider(), new PersistentTasksService(services.clusterService(), services.threadPool(), services.client()), + services.linkedProjectConfigService(), services.projectResolver() ); } catch (final Exception e) { @@ -769,6 +771,7 @@ Collection createComponents( IndexNameExpressionResolver expressionResolver, TelemetryProvider telemetryProvider, PersistentTasksService persistentTasksService, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) throws Exception { logger.info("Security is {}", enabled ? "enabled" : "disabled"); @@ -1144,6 +1147,7 @@ Collection createComponents( operatorPrivilegesService.get(), restrictedIndices, authorizationDenialMessages.get(), + linkedProjectConfigService, projectResolver ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index f7f0f48f1c0fe..15fc7d6c8defc 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -49,6 +49,7 @@ import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -164,12 +165,13 @@ public AuthorizationService( OperatorPrivilegesService operatorPrivilegesService, RestrictedIndices restrictedIndices, AuthorizationDenialMessages authorizationDenialMessages, + LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver ) { this.clusterService = clusterService; this.auditTrailService = auditTrailService; this.restrictedIndices = restrictedIndices; - this.indicesAndAliasesResolver = new IndicesAndAliasesResolver(settings, clusterService, resolver); + this.indicesAndAliasesResolver = new IndicesAndAliasesResolver(settings, linkedProjectConfigService, resolver); this.authcFailureHandler = authcFailureHandler; this.threadContext = threadPool.getThreadContext(); this.securityContext = new SecurityContext(settings, this.threadContext); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index fd5e82a4edb24..8ff69db0b2cf8 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -22,18 +22,17 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; import org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField; @@ -59,10 +58,14 @@ class IndicesAndAliasesResolver { private final IndexAbstractionResolver indexAbstractionResolver; private final RemoteClusterResolver remoteClusterResolver; - IndicesAndAliasesResolver(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver) { + IndicesAndAliasesResolver( + Settings settings, + LinkedProjectConfigService linkedProjectConfigService, + IndexNameExpressionResolver resolver + ) { this.nameExpressionResolver = resolver; this.indexAbstractionResolver = new IndexAbstractionResolver(resolver); - this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + this.remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); } /** @@ -545,18 +548,20 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; - private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { + private RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { super(settings); - clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); - listenForUpdates(clusterSettings); + clusters = new CopyOnWriteArraySet<>( + linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + ); + linkedProjectConfigService.register(this); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index bf82846a81da4..93d1e15aebd3a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -71,6 +71,7 @@ import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; @@ -262,6 +263,7 @@ private Collection createComponentsUtil(Settings settings) throws Except TestIndexNameExpressionResolver.newInstance(threadContext), TelemetryProvider.NOOP, mock(PersistentTasksService.class), + StubLinkedProjectConfigService.INSTANCE, TestProjectResolvers.alwaysThrow() ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index e4bb33c66d983..0e9393d418d8b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -133,7 +133,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService; import org.elasticsearch.transport.EmptyRequest; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xcontent.XContentBuilder; @@ -267,6 +269,7 @@ public class AuthorizationServiceTests extends ESTestCase { private SecurityContext securityContext; private ProjectResolver projectResolver; private IndexNameExpressionResolver indexNameExpressionResolver; + private LinkedProjectConfigService linkedProjectConfigService; @SuppressWarnings("unchecked") @Before @@ -320,6 +323,7 @@ public void setup() { operatorPrivilegesService = mock(OperatorPrivileges.OperatorPrivilegesService.class); projectResolver = TestProjectResolvers.singleProject(projectId); indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(projectResolver); + linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, projectResolver); authorizationService = new AuthorizationService( settings, rolesStore, @@ -336,6 +340,7 @@ public void setup() { operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); } @@ -1769,6 +1774,7 @@ public void testDenialForAnonymousUser() { operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); @@ -1819,6 +1825,7 @@ public void testDenialForAnonymousUserAuthorizationExceptionDisabled() { operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); @@ -3357,6 +3364,7 @@ public void testAuthorizationEngineSelectionForCheckPrivileges() throws Exceptio operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); @@ -3513,6 +3521,7 @@ public void getUserPrivileges(AuthorizationInfo authorizationInfo, ActionListene operatorPrivilegesService, RESTRICTED_INDICES, new AuthorizationDenialMessages.Default(), + linkedProjectConfigService, projectResolver ); Authentication authentication; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java index 530b012cb55c3..0462679a5ff18 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService; import org.elasticsearch.transport.EmptyRequest; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.TransportRequest; @@ -242,6 +243,7 @@ public void setup() { user = new User("user", "role"); userDashIndices = new User("dash", "dash"); userNoIndices = new User("test", "test"); + final var projectResolver = TestProjectResolvers.DEFAULT_PROJECT_ONLY; final FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(Settings.EMPTY); rolesStore = Mockito.spy( new CompositeRolesStore( @@ -254,7 +256,7 @@ public void setup() { fieldPermissionsCache, mock(ApiKeyService.class), mock(ServiceAccountService.class), - TestProjectResolvers.DEFAULT_PROJECT_ONLY, + projectResolver, new DocumentSubsetBitsetCache(Settings.EMPTY), RESTRICTED_INDICES, EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -417,7 +419,11 @@ public void setup() { ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - defaultIndicesResolver = new IndicesAndAliasesResolver(settings, clusterService, indexNameExpressionResolver); + defaultIndicesResolver = new IndicesAndAliasesResolver( + settings, + new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver), + indexNameExpressionResolver + ); } public void testDashIndicesAreAllowedInShardLevelRequests() { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java index 0af3a5a36cad8..dcdfde0def2e4 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java @@ -9,7 +9,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -24,6 +23,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -82,7 +82,8 @@ public Collection createComponents(PluginServices services) { return createComponents( services.client(), services.environment().settings(), - services.clusterService(), + services.clusterService().getClusterName().value(), + services.linkedProjectConfigService(), services.namedWriteableRegistry() ); } @@ -93,13 +94,14 @@ public Collection createComponents(PluginServices services) { Collection createComponents( Client client, Settings settings, - ClusterService clusterService, + String clusterName, + LinkedProjectConfigService linkedProjectConfigService, NamedWriteableRegistry namedWriteableRegistry ) { - RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + RemoteClusterResolver remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); IndexResolver indexResolver = new IndexResolver( client, - clusterService.getClusterName().value(), + clusterName, SqlDataTypeRegistry.INSTANCE, remoteClusterResolver::remoteClusters ); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java index 47a76b4a5291d..90439cb7dd491 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlPluginTests.java @@ -7,10 +7,8 @@ package org.elasticsearch.xpack.sql.plugin; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -18,29 +16,25 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.rest.RestController; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.xpack.sql.session.Cursors; import java.util.Collections; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class SqlPluginTests extends ESTestCase { public void testSqlDisabledIsNoOp() { Settings settings = Settings.builder().put("xpack.sql.enabled", false).build(); SqlPlugin plugin = new SqlPlugin(settings); - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterName()).thenReturn(new ClusterName(randomAlphaOfLength(10))); - when(clusterService.getClusterSettings()).thenReturn( - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ); assertThat( plugin.createComponents( mock(Client.class), Settings.EMPTY, - clusterService, + randomAlphaOfLength(10), + StubLinkedProjectConfigService.INSTANCE, new NamedWriteableRegistry(Cursors.getNamedWriteables()) ), hasSize(3) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 745ccae86816c..a577276ecc08c 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -46,8 +45,8 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; @@ -177,12 +176,7 @@ public void createComponents() { transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, - mock(TaskManager.class) - ), + StubLinkedProjectConfigService.INSTANCE, transformsConfigManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index ef59f057c1319..6085d6bbe38af 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -303,7 +303,7 @@ public Collection createComponents(PluginServices services) { TransformCheckpointService checkpointService = new TransformCheckpointService( clock, settings, - clusterService, + services.linkedProjectConfigService(), configManager, auditor ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index ebf98c57c0b9e..ccaa004ac8bd2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -7,10 +7,10 @@ package org.elasticsearch.xpack.transform.checkpoint; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.LinkedProjectConfig; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterSettings; import java.util.Collections; import java.util.List; @@ -46,18 +46,20 @@ int numClusters() { } } - RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { + RemoteClusterResolver(Settings settings, LinkedProjectConfigService linkedProjectConfigService) { super(settings); - clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); - listenForUpdates(clusterSettings); + clusters = new CopyOnWriteArraySet<>( + linkedProjectConfigService.getInitialLinkedProjectConfigs().stream().map(LinkedProjectConfig::linkedProjectAlias).toList() + ); + linkedProjectConfigService.register(this); } @Override - protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (RemoteClusterSettings.isConnectionEnabled(clusterAlias, settings)) { - clusters.add(clusterAlias); + public void updateLinkedProject(LinkedProjectConfig config) { + if (config.isConnectionEnabled()) { + clusters.add(config.linkedProjectAlias()); } else { - clusters.remove(clusterAlias); + clusters.remove(config.linkedProjectAlias()); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java index 0826aaff9deab..9598633605674 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java @@ -11,9 +11,9 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.ParentTaskAssigningClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.LinkedProjectConfigService; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; @@ -47,14 +47,14 @@ public class TransformCheckpointService { public TransformCheckpointService( final Clock clock, final Settings settings, - final ClusterService clusterService, + LinkedProjectConfigService linkedProjectConfigService, final TransformConfigManager transformConfigManager, TransformAuditor transformAuditor ) { this.clock = clock; this.transformConfigManager = transformConfigManager; this.transformAuditor = transformAuditor; - this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + this.remoteClusterResolver = new RemoteClusterResolver(settings, linkedProjectConfigService); } public CheckpointProvider getCheckpointProvider(final ParentTaskAssigningClient client, final TransformConfig transformConfig) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 3df47fb3bc066..8ed6b49d98753 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; @@ -30,6 +29,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.MockLog.LoggingExpectation; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.Transport; @@ -467,7 +467,7 @@ private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transfor return new DefaultCheckpointProvider( clock, parentTaskClient, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + new RemoteClusterResolver(Settings.EMPTY, StubLinkedProjectConfigService.INSTANCE), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index 15a681d093739..edb25933de5c0 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ParentTaskAssigningClient; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; @@ -28,6 +27,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; @@ -287,7 +287,7 @@ private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transf return new TimeBasedCheckpointProvider( clock, parentTaskClient, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + new RemoteClusterResolver(Settings.EMPTY, StubLinkedProjectConfigService.INSTANCE), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 5fd3db63c70e6..5dc19e17a57f7 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -660,12 +661,7 @@ private TransformServices transformServices(TransformConfigManager configManager var transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, - null - ), + StubLinkedProjectConfigService.INSTANCE, configManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 535484ed3a196..2b9be7097b75c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; @@ -31,6 +30,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -196,12 +196,7 @@ private TransformServices transformServices(Clock clock, TransformAuditor audito var transformsCheckpointService = new TransformCheckpointService( clock, Settings.EMPTY, - new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, - null - ), + StubLinkedProjectConfigService.INSTANCE, transformsConfigManager, auditor );