Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.transport.RemoteClusterCredentialsManager.UpdateRemoteClusterCredentialsResult;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -50,7 +48,6 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -106,6 +103,10 @@ public boolean isRemoteClusterServerEnabled() {
this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false);
}

public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
return remoteClusterCredentialsManager;
}

/**
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
* @param remoteClusterNames Set of configured remote cluster names.
Expand Down Expand Up @@ -185,6 +186,13 @@ public Set<String> getRegisteredRemoteClusterNames() {
return getConnectionsMapForCurrentProject().keySet();
}

/**
* Returns the registered linked project aliases for the provided origin Project ID.
*/
public Set<String> getRegisteredRemoteClusterNames(ProjectId originProjectId) {
return getConnectionsMapForProject(originProjectId).keySet();
}

/**
* Returns a connection to the given node on the given remote cluster
*
Expand Down Expand Up @@ -298,79 +306,6 @@ public void skipUnavailableChanged(
}
}

@FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.")
public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) {
final var projectId = projectResolver.getProjectId();
final Settings settings = settingsSupplier.get();
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings);
// We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential
// value was updated. Therefore, only consider added or removed aliases
final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size();
if (totalConnectionsToRebuild == 0) {
logger.debug("project [{}] no connection rebuilding required after credentials update", projectId);
listener.onResponse(null);
return;
}
logger.info("project [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild);
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (var clusterAlias : result.addedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
}
for (var clusterAlias : result.removedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
}
}
}

private void maybeRebuildConnectionOnCredentialsChange(
ProjectId projectId,
String clusterAlias,
Settings newSettings,
RefCountingRunnable connectionRefs
) {
final var connectionsMap = getConnectionsMapForProject(projectId);
if (false == connectionsMap.containsKey(clusterAlias)) {
// A credential was added or removed before a remote connection was configured.
// Without an existing connection, there is nothing to rebuild.
logger.info(
"project [{}] no connection rebuild required for remote cluster [{}] after credentials change",
projectId,
clusterAlias
);
return;
}

final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
final var config = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings);
updateRemoteCluster(config, true, ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(RemoteClusterConnectionStatus status) {
logger.info(
"project [{}] remote cluster connection [{}] updated after credentials change: [{}]",
projectId,
clusterAlias,
status
);
}

@Override
public void onFailure(Exception e) {
// We don't want to return an error to the upstream listener here since a connection rebuild failure
// does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call.
// Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of
// returning an error)
logger.warn(
() -> "project ["
+ projectId
+ "] failed to update remote cluster connection ["
+ clusterAlias
+ "] after credentials change",
e
);
}
}, connectionRefs.acquire()));
}

@Override
public void updateLinkedProject(LinkedProjectConfig config) {
final var projectId = config.originProjectId();
Expand Down Expand Up @@ -412,8 +347,7 @@ public void onFailure(Exception e) {
* @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it.
* @param listener The listener invoked once the configured cluster has been connected.
*/
// Package-access for testing.
synchronized void updateRemoteCluster(
public synchronized void updateRemoteCluster(
LinkedProjectConfig config,
boolean forceRebuild,
ActionListener<RemoteClusterConnectionStatus> listener
Expand Down Expand Up @@ -455,7 +389,7 @@ synchronized void updateRemoteCluster(
}
}

enum RemoteClusterConnectionStatus {
public enum RemoteClusterConnectionStatus {
CONNECTED,
DISCONNECTED,
RECONNECTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -1625,9 +1626,12 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
{
final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10));
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
final PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus> listener = new PlainActionFuture<>();
final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build();
service.updateRemoteClusterCredentials(() -> settings, listener);
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings);
assertThat(result.addedClusterAliases(), equalTo(Set.of("cluster_1")));
final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, settings);
service.updateRemoteCluster(config, true, listener);
listener.actionGet(10, TimeUnit.SECONDS);
}

Expand All @@ -1637,12 +1641,13 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
);

{
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
service.updateRemoteClusterCredentials(
// Settings without credentials constitute credentials removal
() -> clusterSettings,
listener
);
final PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus> listener = new PlainActionFuture<>();
// Settings without credentials constitute credentials removal
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(clusterSettings);
assertThat(result.addedClusterAliases().size(), equalTo(0));
assertThat(result.removedClusterAliases(), equalTo(Set.of("cluster_1")));
final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, clusterSettings);
service.updateRemoteCluster(config, true, listener);
listener.actionGet(10, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -1718,6 +1723,8 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default");
assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default");
expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster));
final Set<String> aliases = Set.of(badCluster, goodCluster, missingCluster);
final ActionListener<RemoteClusterService.RemoteClusterConnectionStatus> noop = ActionListener.noop();

{
final MockSecureSettings secureSettings = new MockSecureSettings();
Expand All @@ -1730,7 +1737,14 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
.put(cluster2Settings)
.setSecureSettings(secureSettings)
.build();
service.updateRemoteClusterCredentials(() -> settings, listener);
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings);
assertThat(result.addedClusterAliases(), equalTo(aliases));
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (String alias : aliases) {
final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings);
service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire()));
}
}
listener.actionGet(10, TimeUnit.SECONDS);
}

Expand All @@ -1747,11 +1761,16 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
{
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
final Settings settings = Settings.builder().put(cluster1Settings).put(cluster2Settings).build();
service.updateRemoteClusterCredentials(
// Settings without credentials constitute credentials removal
() -> settings,
listener
);
// Settings without credentials constitute credentials removal
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings);
assertThat(result.addedClusterAliases().size(), equalTo(0));
assertThat(result.removedClusterAliases(), equalTo(aliases));
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (String alias : aliases) {
final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings);
service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire()));
}
}
listener.actionGet(10, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -1828,6 +1847,12 @@ public void testLogsConnectionResult() throws IOException {
}
}

@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
private LinkedProjectConfig buildLinkedProjectConfig(String alias, Settings staticSettings, Settings newSettings) {
final var mergedSettings = Settings.builder().put(staticSettings, false).put(newSettings, false).build();
return RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, mergedSettings);
}

private void updateRemoteCluster(
RemoteClusterService service,
String alias,
Expand All @@ -1846,10 +1871,7 @@ private void updateRemoteCluster(
Settings newSettings,
ActionListener<RemoteClusterService.RemoteClusterConnectionStatus> listener
) {
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, mergedSettings);
service.updateRemoteCluster(config, false, listener);
service.updateRemoteCluster(buildLinkedProjectConfig(alias, settings, newSettings), false, listener);
}

private void initializeRemoteClusters(RemoteClusterService remoteClusterService) {
Expand Down
Loading