Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.project.DefaultProjectResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.SecureSetting;
Expand All @@ -31,6 +34,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.IndicesExpressionGrouper;
Expand All @@ -53,6 +57,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
Expand Down Expand Up @@ -155,9 +160,11 @@ public boolean isRemoteClusterServerEnabled() {
}

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final Map<ProjectId, Map<String, RemoteClusterConnection>> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
private final ProjectResolver projectResolver;

@FixForMultiProject(description = "Inject the ProjectResolver instance.")
RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
Expand All @@ -167,6 +174,21 @@ public boolean isRemoteClusterServerEnabled() {
if (remoteClusterServerEnabled) {
registerRemoteClusterHandshakeRequestHandler(transportService);
}
this.projectResolver = DefaultProjectResolver.INSTANCE;
}

/**
* Returns the map of connections for the {@link ProjectId} currently returned by the {@link ProjectResolver}.
*/
private Map<String, RemoteClusterConnection> getConnectionsMap() {
return getConnectionsMap(projectResolver.getProjectId());
}

/**
* Returns the map of connections for the given {@link ProjectId}.
*/
private Map<String, RemoteClusterConnection> getConnectionsMap(ProjectId projectId) {
return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with my previous suggestion, this needs to change as well since immutable map does not support computeIfAbsent. So something like the following would be necessary

Suggested change
return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap());
return projectResolver.supportsMultipleProjects() ? remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap()) : remoteClusters.get(projectId);

We can also assert projectId is default when projectResolver.supportsMultipleProjects() == false and vice versa.

}

public DiscoveryNode getLocalNode() {
Expand All @@ -176,12 +198,12 @@ public DiscoveryNode getLocalNode() {
/**
* Returns <code>true</code> if at least one remote cluster is configured
*/
public boolean isCrossClusterSearchEnabled() {
return remoteClusters.isEmpty() == false;
boolean isCrossClusterSearchEnabled() {
return getRegisteredRemoteClusterNames().isEmpty() == false;
}

boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode node) {
return remoteClusters.get(remoteCluster).isNodeConnected(node);
return getConnectionsMap().get(remoteCluster).isNodeConnected(node);
}

/**
Expand Down Expand Up @@ -263,14 +285,14 @@ public Set<String> getConfiguredClusters() {
* Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
*/
boolean isRemoteClusterRegistered(String clusterName) {
return remoteClusters.containsKey(clusterName);
return getConnectionsMap().containsKey(clusterName);
}

/**
* Returns the registered remote cluster names.
*/
public Set<String> getRegisteredRemoteClusterNames() {
return remoteClusters.keySet();
return getConnectionsMap().keySet();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods like this one relies on the project context configured correctly by the caller. Not an issue for non-MP. But we will need to double check for MP. Probably worth to log an issue so that we don't forget.

This method is used by TransportClusterStatsAction which can be argued as cluster scoped. In that case, we'd need loop through all projects for this method. This will be a big change since it affects REST API output. So I think we can go with this now and revisit it later similar to how we did for indices stats and add a fix annotation there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the fix annotation here and in a few other places. It might be good to review the package-access test methods as well as some of the other public methods that use the connections map and see if we can reduce the overall API surface area. It would help with reasoning about all the multi-project use cases. I can submit small separate PRs if I find some scenarios where we'd get some benefit from refactoring.


/**
Expand Down Expand Up @@ -347,7 +369,7 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
RemoteClusterConnection connection = remoteClusters.get(cluster);
RemoteClusterConnection connection = getConnectionsMap().get(cluster);
if (connection == null) {
throw new NoSuchRemoteClusterException(cluster);
}
Expand All @@ -361,48 +383,63 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
}

private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
RemoteClusterConnection remote = getConnectionsMap().get(clusterAlias);
if (remote != null) {
remote.setSkipUnavailable(skipUnavailable);
}
}

@FixForMultiProject(description = "Refactor to provide the project ID that the settings are associated with.")
public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) {
final var projectId = projectResolver.getProjectId();
final Settings settings = settingsSupplier.get();
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings);
// We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential
// value was updated. Therefore, only consider added or removed aliases
final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size();
if (totalConnectionsToRebuild == 0) {
logger.debug("no connection rebuilding required after credentials update");
logger.debug("project ID [{}] no connection rebuilding required after credentials update", projectId);
listener.onResponse(null);
return;
}
logger.info("rebuilding [{}] connections after credentials update", totalConnectionsToRebuild);
logger.info("project ID [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild);
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (var clusterAlias : result.addedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs);
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
}
for (var clusterAlias : result.removedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs);
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
}
}
}

// package-private for testing

private void maybeRebuildConnectionOnCredentialsChange(String clusterAlias, Settings settings, RefCountingRunnable connectionRefs) {
if (false == remoteClusters.containsKey(clusterAlias)) {
private void maybeRebuildConnectionOnCredentialsChange(
ProjectId projectId,
String clusterAlias,
Settings settings,
RefCountingRunnable connectionRefs
) {
final var connectionsMap = getConnectionsMap(projectId);
if (false == connectionsMap.containsKey(clusterAlias)) {
// A credential was added or removed before a remote connection was configured.
// Without an existing connection, there is nothing to rebuild.
logger.info("no connection rebuild required for remote cluster [{}] after credentials change", clusterAlias);
logger.info(
"project ID [{}] no connection rebuild required for remote cluster [{}] after credentials change",
projectId,
clusterAlias
);
return;
}

updateRemoteCluster(clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() {
updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(RemoteClusterConnectionStatus status) {
logger.info("remote cluster connection [{}] updated after credentials change: [{}]", clusterAlias, status);
logger.info(
"project ID [{}] remote cluster connection [{}] updated after credentials change: [{}]",
projectId,
clusterAlias,
status
);
}

@Override
Expand All @@ -411,23 +448,32 @@ public void onFailure(Exception e) {
// does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call.
// Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of
// returning an error)
logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "] after credentials change", e);
logger.warn(
() -> "project ID ["
+ projectId
+ "] failed to update remote cluster connection ["
+ clusterAlias
+ "] after credentials change",
e
);
}
}, connectionRefs.acquire()));
}

@FixForMultiProject(description = "Refactor to provide the project ID that the linked alias and settings are associated with.")
@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
final var projectId = projectResolver.getProjectId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called by settings applier which does not have project context today so that resolver won't work for MP. Can we link an issue (if there is one for settings changes) here to remind us? Maybe the fix annotation is for this purpose? If so, I'd prefer we move it directly above projectId assignment line.

CountDownLatch latch = new CountDownLatch(1);
updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() {
updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(RemoteClusterConnectionStatus status) {
logger.info("remote cluster connection [{}] updated: {}", clusterAlias, status);
logger.info("project ID [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status);
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "]", e);
logger.warn(() -> "project ID [" + projectId + " failed to update remote cluster connection [" + clusterAlias + "]", e);
}
}, latch::countDown));

Expand All @@ -436,7 +482,12 @@ public void onFailure(Exception e) {
// are on the cluster state thread and our custom future implementation will throw an
// assertion.
if (latch.await(10, TimeUnit.SECONDS) == false) {
logger.warn("failed to update remote cluster connection [{}] within {}", clusterAlias, TimeValue.timeValueSeconds(10));
logger.warn(
"project ID [{}] failed to update remote cluster connection [{}] within {}",
projectId,
clusterAlias,
TimeValue.timeValueSeconds(10)
);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -451,10 +502,11 @@ public void onFailure(Exception e) {
* @param listener a listener invoked once every configured cluster has been connected to
*/
void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener<RemoteClusterConnectionStatus> listener) {
updateRemoteCluster(clusterAlias, newSettings, false, listener);
updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener);
}

private synchronized void updateRemoteCluster(
ProjectId projectId,
String clusterAlias,
Settings newSettings,
boolean forceRebuild,
Expand All @@ -464,14 +516,15 @@ private synchronized void updateRemoteCluster(
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}

RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
final var connectionMap = getConnectionsMap(projectId);
RemoteClusterConnection remote = connectionMap.get(clusterAlias);
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, newSettings) == false) {
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
logger.warn("project ID [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
}
remoteClusters.remove(clusterAlias);
connectionMap.remove(clusterAlias);
listener.onResponse(RemoteClusterConnectionStatus.DISCONNECTED);
return;
}
Expand All @@ -480,19 +533,19 @@ private synchronized void updateRemoteCluster(
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
connectionMap.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
} else if (forceRebuild || remote.shouldRebuildConnection(newSettings)) {
// Changes to connection configuration. Must tear down existing connection
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
logger.warn("project ID [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
}
remoteClusters.remove(clusterAlias);
connectionMap.remove(clusterAlias);
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
connectionMap.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else {
// No changes to connection configuration.
Expand All @@ -511,7 +564,9 @@ enum RemoteClusterConnectionStatus {
* Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection
* to all configured seed nodes.
*/
@FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.")
void initializeRemoteClusters() {
final var projectId = projectResolver.getProjectId();
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Set<String> enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings);
Expand All @@ -522,7 +577,7 @@ void initializeRemoteClusters() {

CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future);
for (String clusterAlias : enabledClusters) {
updateRemoteCluster(clusterAlias, settings, listener.map(ignored -> null));
updateRemoteCluster(projectId, clusterAlias, settings, false, listener.map(ignored -> null));
}

if (enabledClusters.isEmpty()) {
Expand All @@ -534,19 +589,19 @@ void initializeRemoteClusters() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
logger.warn("project ID [{}] failed to connect to remote clusters within {}", projectId, timeValue.toString());
} catch (Exception e) {
logger.warn("failed to connect to remote clusters", e);
logger.warn("project ID [" + projectId + "] failed to connect to remote clusters", e);
}
}

@Override
public void close() throws IOException {
IOUtils.close(remoteClusters.values());
IOUtils.close(remoteClusters.values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList()));
}

public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
return getConnectionsMap().values().stream().map(RemoteClusterConnection::getConnectionInfo);
}

@Override
Expand All @@ -568,8 +623,9 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
final var projectConnectionsMap = getConnectionsMap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if async search would provide the necessary project context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the fix annotation here as well.

for (String cluster : clusters) {
if (this.remoteClusters.containsKey(cluster) == false) {
if (projectConnectionsMap.containsKey(cluster) == false) {
listener.onFailure(new NoSuchRemoteClusterException(cluster));
return;
}
Expand All @@ -579,7 +635,14 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String,
CountDown countDown = new CountDown(clusters.size());
Function<String, DiscoveryNode> nullFunction = s -> null;
for (final String cluster : clusters) {
RemoteClusterConnection connection = this.remoteClusters.get(cluster);
RemoteClusterConnection connection = projectConnectionsMap.get(cluster);
// Ensure the connection is not null, it could have been removed since the containsKey() call above.
if (connection == null) {
if (countDown.fastForward()) {
listener.onFailure(new NoSuchRemoteClusterException(cluster));
}
continue;
}
connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>() {
@Override
public void onResponse(Function<String, DiscoveryNode> nodeLookup) {
Expand Down Expand Up @@ -654,7 +717,7 @@ public RemoteClusterClient getRemoteClusterClient(
}

Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
return getConnectionsMap().values();
}

static void registerRemoteClusterHandshakeRequestHandler(TransportService transportService) {
Expand Down