diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 656d19373d7e7..a23ae39dd49a1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -21,8 +21,11 @@ import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.project.DefaultProjectResolver; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.SecureSetting; @@ -31,6 +34,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.IndicesExpressionGrouper; @@ -52,6 +56,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; @@ -154,14 +159,20 @@ public boolean isRemoteClusterServerEnabled() { } private final TransportService transportService; - private final Map remoteClusters = ConcurrentCollections.newConcurrentMap(); + private final Map> remoteClusters; private final RemoteClusterCredentialsManager remoteClusterCredentialsManager; + private final ProjectResolver projectResolver; + @FixForMultiProject(description = "Inject the ProjectResolver instance.") RemoteClusterService(Settings settings, TransportService transportService) { super(settings); this.enabled = DiscoveryNode.isRemoteClusterClient(settings); this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings); this.transportService = transportService; + this.projectResolver = DefaultProjectResolver.INSTANCE; + this.remoteClusters = projectResolver.supportsMultipleProjects() + ? ConcurrentCollections.newConcurrentMap() + : Map.of(ProjectId.DEFAULT, ConcurrentCollections.newConcurrentMap()); this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings); if (remoteClusterServerEnabled) { registerRemoteClusterHandshakeRequestHandler(transportService); @@ -250,8 +261,9 @@ public Set getConfiguredClusters() { /** * Returns the registered remote cluster names. */ + @FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.") public Set getRegisteredRemoteClusterNames() { - return remoteClusters.keySet(); + return getConnectionsMapForCurrentProject().keySet(); } /** @@ -328,7 +340,8 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) { "this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role" ); } - RemoteClusterConnection connection = remoteClusters.get(cluster); + @FixForMultiProject(description = "Verify all callers will have the proper context set for resolving the origin project ID.") + RemoteClusterConnection connection = getConnectionsMapForCurrentProject().get(cluster); if (connection == null) { throw new NoSuchRemoteClusterException(cluster); } @@ -342,48 +355,63 @@ public void listenForUpdates(ClusterSettings clusterSettings) { } private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { - RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias); if (remote != null) { remote.setSkipUnavailable(skipUnavailable); } } + @FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.") public synchronized void updateRemoteClusterCredentials(Supplier settingsSupplier, ActionListener listener) { + final var projectId = projectResolver.getProjectId(); final Settings settings = settingsSupplier.get(); final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings); // We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential // value was updated. Therefore, only consider added or removed aliases final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size(); if (totalConnectionsToRebuild == 0) { - logger.debug("no connection rebuilding required after credentials update"); + logger.debug("project [{}] no connection rebuilding required after credentials update", projectId); listener.onResponse(null); return; } - logger.info("rebuilding [{}] connections after credentials update", totalConnectionsToRebuild); + logger.info("project [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild); try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { for (var clusterAlias : result.addedClusterAliases()) { - maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs); + maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs); } for (var clusterAlias : result.removedClusterAliases()) { - maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs); + maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs); } } } - // package-private for testing - - private void maybeRebuildConnectionOnCredentialsChange(String clusterAlias, Settings settings, RefCountingRunnable connectionRefs) { - if (false == remoteClusters.containsKey(clusterAlias)) { + private void maybeRebuildConnectionOnCredentialsChange( + ProjectId projectId, + String clusterAlias, + Settings settings, + RefCountingRunnable connectionRefs + ) { + final var connectionsMap = getConnectionsMapForProject(projectId); + if (false == connectionsMap.containsKey(clusterAlias)) { // A credential was added or removed before a remote connection was configured. // Without an existing connection, there is nothing to rebuild. - logger.info("no connection rebuild required for remote cluster [{}] after credentials change", clusterAlias); + logger.info( + "project [{}] no connection rebuild required for remote cluster [{}] after credentials change", + projectId, + clusterAlias + ); return; } - updateRemoteCluster(clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { + updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { - logger.info("remote cluster connection [{}] updated after credentials change: [{}]", clusterAlias, status); + logger.info( + "project [{}] remote cluster connection [{}] updated after credentials change: [{}]", + projectId, + clusterAlias, + status + ); } @Override @@ -392,23 +420,32 @@ public void onFailure(Exception e) { // does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call. // Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of // returning an error) - logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "] after credentials change", e); + logger.warn( + () -> "project [" + + projectId + + "] failed to update remote cluster connection [" + + clusterAlias + + "] after credentials change", + e + ); } }, connectionRefs.acquire())); } @Override protected void updateRemoteCluster(String clusterAlias, Settings settings) { + @FixForMultiProject(description = "ES-12270: Refactor as needed to support project specific changes to linked remotes.") + final var projectId = projectResolver.getProjectId(); CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() { + updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { - logger.info("remote cluster connection [{}] updated: {}", clusterAlias, status); + logger.info("project [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status); } @Override public void onFailure(Exception e) { - logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "]", e); + logger.warn(() -> "project [" + projectId + " failed to update remote cluster connection [" + clusterAlias + "]", e); } }, latch::countDown)); @@ -417,25 +454,35 @@ public void onFailure(Exception e) { // are on the cluster state thread and our custom future implementation will throw an // assertion. if (latch.await(10, TimeUnit.SECONDS) == false) { - logger.warn("failed to update remote cluster connection [{}] within {}", clusterAlias, TimeValue.timeValueSeconds(10)); + logger.warn( + "project [{}] failed to update remote cluster connection [{}] within {}", + projectId, + clusterAlias, + TimeValue.timeValueSeconds(10) + ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } - /** - * This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure - * - * @param clusterAlias a cluster alias to discovery node mapping representing the remote clusters seeds nodes - * @param newSettings the updated settings for the remote connection - * @param listener a listener invoked once every configured cluster has been connected to - */ + // Package-access for testing. + @FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.") void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener listener) { - updateRemoteCluster(clusterAlias, newSettings, false, listener); + updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener); } + /** + * Adds, rebuilds, or closes and removes the connection for the specified remote cluster. + * + * @param projectId The project the remote cluster is associated with. + * @param clusterAlias The alias used for the remote cluster being connected. + * @param newSettings The updated settings for the remote connection. + * @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it. + * @param listener The listener invoked once the configured cluster has been connected. + */ private synchronized void updateRemoteCluster( + ProjectId projectId, String clusterAlias, Settings newSettings, boolean forceRebuild, @@ -445,14 +492,15 @@ private synchronized void updateRemoteCluster( throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } - RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + final var connectionMap = getConnectionsMapForProject(projectId); + RemoteClusterConnection remote = connectionMap.get(clusterAlias); if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, newSettings) == false) { try { IOUtils.close(remote); } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e); } - remoteClusters.remove(clusterAlias); + connectionMap.remove(clusterAlias); listener.onResponse(RemoteClusterConnectionStatus.DISCONNECTED); return; } @@ -461,19 +509,19 @@ private synchronized void updateRemoteCluster( // this is a new cluster we have to add a new representation Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager); - remoteClusters.put(clusterAlias, remote); + connectionMap.put(clusterAlias, remote); remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED)); } else if (forceRebuild || remote.shouldRebuildConnection(newSettings)) { // Changes to connection configuration. Must tear down existing connection try { IOUtils.close(remote); } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e); } - remoteClusters.remove(clusterAlias); + connectionMap.remove(clusterAlias); Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager); - remoteClusters.put(clusterAlias, remote); + connectionMap.put(clusterAlias, remote); remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED)); } else { // No changes to connection configuration. @@ -493,6 +541,8 @@ enum RemoteClusterConnectionStatus { * to all configured seed nodes. */ void initializeRemoteClusters() { + @FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.") + final var projectId = projectResolver.getProjectId(); final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); @@ -503,7 +553,7 @@ void initializeRemoteClusters() { CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future); for (String clusterAlias : enabledClusters) { - updateRemoteCluster(clusterAlias, settings, listener.map(ignored -> null)); + updateRemoteCluster(projectId, clusterAlias, settings, false, listener.map(ignored -> null)); } if (enabledClusters.isEmpty()) { @@ -515,19 +565,20 @@ void initializeRemoteClusters() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (TimeoutException ex) { - logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); + logger.warn("project [{}] failed to connect to remote clusters within {}", projectId, timeValue.toString()); } catch (Exception e) { - logger.warn("failed to connect to remote clusters", e); + logger.warn("project [" + projectId + "] failed to connect to remote clusters", e); } } @Override public void close() throws IOException { - IOUtils.close(remoteClusters.values()); + IOUtils.close(remoteClusters.values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList())); } + @FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.") public Stream getRemoteConnectionInfos() { - return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); + return getConnectionsMapForCurrentProject().values().stream().map(RemoteClusterConnection::getConnectionInfo); } @Override @@ -549,9 +600,11 @@ public void collectNodes(Set clusters, ActionListener(); for (String cluster : clusters) { - final var connection = this.remoteClusters.get(cluster); + final var connection = projectConnectionsMap.get(cluster); if (connection == null) { listener.onFailure(new NoSuchRemoteClusterException(cluster)); return; @@ -654,6 +707,25 @@ static void registerRemoteClusterHandshakeRequestHandler(TransportService transp ); } + /** + * Returns the map of connections for the {@link ProjectId} currently returned by the {@link ProjectResolver}. + */ + private Map getConnectionsMapForCurrentProject() { + return getConnectionsMapForProject(projectResolver.getProjectId()); + } + + /** + * Returns the map of connections for the given {@link ProjectId}. + */ + private Map getConnectionsMapForProject(ProjectId projectId) { + if (projectResolver.supportsMultipleProjects()) { + assert ProjectId.DEFAULT.equals(projectId) == false : "The default project ID should not be used in multi-project environment"; + return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap()); + } + assert ProjectId.DEFAULT.equals(projectId) : "Only the default project ID should be used when multiple projects are not supported"; + return remoteClusters.get(projectId); + } + private static class RemoteConnectionEnabled implements Setting.Validator { private final String clusterAlias;