Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.project.DefaultProjectResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.SecureSetting;
Expand All @@ -31,6 +34,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.IndicesExpressionGrouper;
Expand All @@ -52,6 +56,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
Expand Down Expand Up @@ -154,14 +159,20 @@ public boolean isRemoteClusterServerEnabled() {
}

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final Map<ProjectId, Map<String, RemoteClusterConnection>> remoteClusters;
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
private final ProjectResolver projectResolver;

@FixForMultiProject(description = "Inject the ProjectResolver instance.")
RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
this.transportService = transportService;
this.projectResolver = DefaultProjectResolver.INSTANCE;
this.remoteClusters = projectResolver.supportsMultipleProjects()
? ConcurrentCollections.newConcurrentMap()
: Map.of(ProjectId.DEFAULT, ConcurrentCollections.newConcurrentMap());
this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
if (remoteClusterServerEnabled) {
registerRemoteClusterHandshakeRequestHandler(transportService);
Expand Down Expand Up @@ -250,8 +261,9 @@ public Set<String> getConfiguredClusters() {
/**
* Returns the registered remote cluster names.
*/
@FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.")
public Set<String> getRegisteredRemoteClusterNames() {
return remoteClusters.keySet();
return getConnectionsMapForCurrentProject().keySet();
}

/**
Expand Down Expand Up @@ -328,7 +340,8 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
RemoteClusterConnection connection = remoteClusters.get(cluster);
@FixForMultiProject(description = "Verify all callers will have the proper context set for resolving the origin project ID.")
RemoteClusterConnection connection = getConnectionsMapForCurrentProject().get(cluster);
if (connection == null) {
throw new NoSuchRemoteClusterException(cluster);
}
Expand All @@ -342,48 +355,63 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
}

private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias);
if (remote != null) {
remote.setSkipUnavailable(skipUnavailable);
}
}

@FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.")
public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) {
final var projectId = projectResolver.getProjectId();
final Settings settings = settingsSupplier.get();
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings);
// We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential
// value was updated. Therefore, only consider added or removed aliases
final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size();
if (totalConnectionsToRebuild == 0) {
logger.debug("no connection rebuilding required after credentials update");
logger.debug("project [{}] no connection rebuilding required after credentials update", projectId);
listener.onResponse(null);
return;
}
logger.info("rebuilding [{}] connections after credentials update", totalConnectionsToRebuild);
logger.info("project [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild);
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (var clusterAlias : result.addedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs);
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
}
for (var clusterAlias : result.removedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs);
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
}
}
}

// package-private for testing

private void maybeRebuildConnectionOnCredentialsChange(String clusterAlias, Settings settings, RefCountingRunnable connectionRefs) {
if (false == remoteClusters.containsKey(clusterAlias)) {
private void maybeRebuildConnectionOnCredentialsChange(
ProjectId projectId,
String clusterAlias,
Settings settings,
RefCountingRunnable connectionRefs
) {
final var connectionsMap = getConnectionsMapForProject(projectId);
if (false == connectionsMap.containsKey(clusterAlias)) {
// A credential was added or removed before a remote connection was configured.
// Without an existing connection, there is nothing to rebuild.
logger.info("no connection rebuild required for remote cluster [{}] after credentials change", clusterAlias);
logger.info(
"project [{}] no connection rebuild required for remote cluster [{}] after credentials change",
projectId,
clusterAlias
);
return;
}

updateRemoteCluster(clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() {
updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(RemoteClusterConnectionStatus status) {
logger.info("remote cluster connection [{}] updated after credentials change: [{}]", clusterAlias, status);
logger.info(
"project [{}] remote cluster connection [{}] updated after credentials change: [{}]",
projectId,
clusterAlias,
status
);
}

@Override
Expand All @@ -392,23 +420,32 @@ public void onFailure(Exception e) {
// does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call.
// Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of
// returning an error)
logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "] after credentials change", e);
logger.warn(
() -> "project ["
+ projectId
+ "] failed to update remote cluster connection ["
+ clusterAlias
+ "] after credentials change",
e
);
}
}, connectionRefs.acquire()));
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
@FixForMultiProject(description = "ES-12270: Refactor as needed to support project specific changes to linked remotes.")
final var projectId = projectResolver.getProjectId();
CountDownLatch latch = new CountDownLatch(1);
updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() {
updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(RemoteClusterConnectionStatus status) {
logger.info("remote cluster connection [{}] updated: {}", clusterAlias, status);
logger.info("project [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status);
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "]", e);
logger.warn(() -> "project [" + projectId + " failed to update remote cluster connection [" + clusterAlias + "]", e);
}
}, latch::countDown));

Expand All @@ -417,25 +454,35 @@ public void onFailure(Exception e) {
// are on the cluster state thread and our custom future implementation will throw an
// assertion.
if (latch.await(10, TimeUnit.SECONDS) == false) {
logger.warn("failed to update remote cluster connection [{}] within {}", clusterAlias, TimeValue.timeValueSeconds(10));
logger.warn(
"project [{}] failed to update remote cluster connection [{}] within {}",
projectId,
clusterAlias,
TimeValue.timeValueSeconds(10)
);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure
*
* @param clusterAlias a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param newSettings the updated settings for the remote connection
* @param listener a listener invoked once every configured cluster has been connected to
*/
Comment on lines -427 to -433
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we salvage javadocs for the parameters to the overloaded method below it, i.e. this method

private synchronized void updateRemoteCluster(
        ProjectId projectId,
        String clusterAlias,
        Settings newSettings,
        boolean forceRebuild,
        ActionListener<RemoteClusterConnectionStatus> listener
    )

?

// Package-access for testing.
@FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.")
void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener<RemoteClusterConnectionStatus> listener) {
Comment on lines +470 to 471
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this method only has test usages after this PR. So yeah I'd prefer to remove it eventually.

updateRemoteCluster(clusterAlias, newSettings, false, listener);
updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener);
}

/**
* Adds, rebuilds, or closes and removes the connection for the specified remote cluster.
*
* @param projectId The project the remote cluster is associated with.
* @param clusterAlias The alias used for the remote cluster being connected.
* @param newSettings The updated settings for the remote connection.
* @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it.
* @param listener The listener invoked once the configured cluster has been connected.
*/
private synchronized void updateRemoteCluster(
ProjectId projectId,
String clusterAlias,
Settings newSettings,
boolean forceRebuild,
Expand All @@ -445,14 +492,15 @@ private synchronized void updateRemoteCluster(
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}

RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
final var connectionMap = getConnectionsMapForProject(projectId);
RemoteClusterConnection remote = connectionMap.get(clusterAlias);
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, newSettings) == false) {
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
}
remoteClusters.remove(clusterAlias);
connectionMap.remove(clusterAlias);
listener.onResponse(RemoteClusterConnectionStatus.DISCONNECTED);
return;
}
Expand All @@ -461,19 +509,19 @@ private synchronized void updateRemoteCluster(
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
connectionMap.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
} else if (forceRebuild || remote.shouldRebuildConnection(newSettings)) {
// Changes to connection configuration. Must tear down existing connection
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
}
remoteClusters.remove(clusterAlias);
connectionMap.remove(clusterAlias);
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
connectionMap.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else {
// No changes to connection configuration.
Expand All @@ -493,6 +541,8 @@ enum RemoteClusterConnectionStatus {
* to all configured seed nodes.
*/
void initializeRemoteClusters() {
@FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.")
final var projectId = projectResolver.getProjectId();
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Set<String> enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings);
Expand All @@ -503,7 +553,7 @@ void initializeRemoteClusters() {

CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future);
for (String clusterAlias : enabledClusters) {
updateRemoteCluster(clusterAlias, settings, listener.map(ignored -> null));
updateRemoteCluster(projectId, clusterAlias, settings, false, listener.map(ignored -> null));
}

if (enabledClusters.isEmpty()) {
Expand All @@ -515,19 +565,20 @@ void initializeRemoteClusters() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
logger.warn("project [{}] failed to connect to remote clusters within {}", projectId, timeValue.toString());
} catch (Exception e) {
logger.warn("failed to connect to remote clusters", e);
logger.warn("project [" + projectId + "] failed to connect to remote clusters", e);
}
}

@Override
public void close() throws IOException {
IOUtils.close(remoteClusters.values());
IOUtils.close(remoteClusters.values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList()));
}

@FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.")
public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
return getConnectionsMapForCurrentProject().values().stream().map(RemoteClusterConnection::getConnectionInfo);
}

@Override
Expand All @@ -549,9 +600,11 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
@FixForMultiProject(description = "Analyze usages and determine if the project ID must be provided.")
final var projectConnectionsMap = getConnectionsMapForCurrentProject();
final var connectionsMap = new HashMap<String, RemoteClusterConnection>();
for (String cluster : clusters) {
final var connection = this.remoteClusters.get(cluster);
final var connection = projectConnectionsMap.get(cluster);
if (connection == null) {
listener.onFailure(new NoSuchRemoteClusterException(cluster));
return;
Expand Down Expand Up @@ -654,6 +707,25 @@ static void registerRemoteClusterHandshakeRequestHandler(TransportService transp
);
}

/**
* Returns the map of connections for the {@link ProjectId} currently returned by the {@link ProjectResolver}.
*/
private Map<String, RemoteClusterConnection> getConnectionsMapForCurrentProject() {
return getConnectionsMapForProject(projectResolver.getProjectId());
}

/**
* Returns the map of connections for the given {@link ProjectId}.
*/
private Map<String, RemoteClusterConnection> getConnectionsMapForProject(ProjectId projectId) {
if (projectResolver.supportsMultipleProjects()) {
assert ProjectId.DEFAULT.equals(projectId) == false : "The default project ID should not be used in multi-project environment";
return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap());
}
assert ProjectId.DEFAULT.equals(projectId) : "Only the default project ID should be used when multiple projects are not supported";
return remoteClusters.get(projectId);
}

private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {

private final String clusterAlias;
Expand Down