Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -106,6 +107,10 @@ public boolean isRemoteClusterServerEnabled() {
this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false);
}

public ProjectResolver getProjectResolver() {
return projectResolver;
}

/**
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
* @param remoteClusterNames Set of configured remote cluster names.
Expand Down Expand Up @@ -302,11 +307,20 @@ public void skipUnavailableChanged(
}
}

@FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.")
public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) {
/**
* Rebuilds linked project connections as needed for updated remote cluster credentials.
* @param settingsSupplier A {@link Supplier} for the updated credentials {@link Settings}.
* @param configFunction A function that builds a {@link LinkedProjectConfig} for an alias, static settings, and new settings.
* @param listener An {@link ActionListener} invoked once all connection modifications have been completed.
*/
public synchronized void updateRemoteClusterCredentials(
Supplier<Settings> settingsSupplier,
TriFunction<String, Settings, Settings, LinkedProjectConfig> configFunction,
ActionListener<Void> listener
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fond of the way this PR turned out. I think I would prefer moving the code in updateRemoteClusterCredentials() into TransportReloadRemoteClusterCredentialsAction, and have TransportReloadRemoteClusterCredentialsAction call RemoteClusterService. maybeRebuildConnectionOnCredentialsChange() with a LinkedProjectConfig as needed. We could add a getter on RemoteClusterService for the RemoteClusterCredentialsManager.

Yang's comment discusses using ClusterSettingsLinkedProjectConfigService, but we aren't aware of or have access to one, or a LinkedProjectConfigService here. I may be misunderstanding what he is suggesting to do here.

@n1v0lg @ywangd - thoughts?

Copy link
Contributor

@n1v0lg n1v0lg Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't wrapped my head around this and what the cleanest approach here would be but in case this is useful:

We can easily expose LinkedProjectConfigService to TransportReloadRemoteClusterCredentialsAction by injecting it during node construction:

final var linkedProjectConfigService = pluginsService.loadSingletonServiceProvider(
    LinkedProjectConfigService.class,
    () -> new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver)
);

modules.bindToInstance(LinkedProjectConfigService.class, linkedProjectConfigService);

exposes it to all TransportActions:

@Inject
public TransportReloadRemoteClusterCredentialsAction(
    TransportService transportService,
    ClusterService clusterService,
    ActionFilters actionFilters,
    LinkedProjectConfigService linkedProjectConfigService
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeremyDahlgren
My previous comment was indeed a bit vague. It took me a while to remember what I was thinking. 🤦 Sorry for the confusion.

My main point is to remove "settings usage for remote cluster config" as much as possible from RemoteClusterService since we now have the unified LinkedProjectConfig object. Hence I mostly like to see us removing maybeRebuildConnectionOnCredentialsChange from RemoteClusterService or at least it should not take Settings but instead LinkedProjectConfig as input.

For concrete changes:

  1. I think "add a getter on RemoteClusterService for the RemoteClusterCredentialsManager" seems a reasonable near term option. I assume this means we call RemoteClusterCredentialsManager#updateClusterCredentials within the transport action. Conceptually this makes sense since with the unified config object, RemoteClusterService should remove itself from dealing with these lower level updates and in long term, RemoteClusterCredentialsManager might need to be managed independently and injected to places. One thing to note is that this means the process of "updating credentials and rebuilding connection" will be performed in two synchronized blocks instead of one. It feels OK to me since clusterCredentials is updated as a volatile field and should not have any visible intermediate state.
  2. You are right that LinkedProjectConfigService is not accessible in RemoteClusterService. If we go with your suggestion of moving the logic to the transport action, we should be able to keep the code mostly as is exception in a different class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yang for the details here. I relocated the code as suggested, and also ended up moving both updateClusterCredentials() and maybeRebuildConnectionOnCredentialsChange() into the action class. To keep the same synchronization behavior I had a synchronized block on the RCS instance, but this may be different than what you were describing in item 1 of your comment. Please let me know what you think here.

) {
final var projectId = projectResolver.getProjectId();
final Settings settings = settingsSupplier.get();
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings);
final Settings newSettings = settingsSupplier.get();
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(newSettings);
// We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential
// value was updated. Therefore, only consider added or removed aliases
final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size();
Expand All @@ -318,20 +332,17 @@ public synchronized void updateRemoteClusterCredentials(Supplier<Settings> setti
logger.info("project [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild);
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (var clusterAlias : result.addedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
maybeRebuildConnectionOnCredentialsChange(configFunction.apply(clusterAlias, settings, newSettings), connectionRefs);
}
for (var clusterAlias : result.removedClusterAliases()) {
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
maybeRebuildConnectionOnCredentialsChange(configFunction.apply(clusterAlias, settings, newSettings), connectionRefs);
}
}
}

private void maybeRebuildConnectionOnCredentialsChange(
ProjectId projectId,
String clusterAlias,
Settings newSettings,
RefCountingRunnable connectionRefs
) {
private void maybeRebuildConnectionOnCredentialsChange(LinkedProjectConfig config, RefCountingRunnable connectionRefs) {
final var projectId = config.originProjectId();
final var clusterAlias = config.linkedProjectAlias();
final var connectionsMap = getConnectionsMapForProject(projectId);
if (false == connectionsMap.containsKey(clusterAlias)) {
// A credential was added or removed before a remote connection was configured.
Expand All @@ -344,8 +355,6 @@ private void maybeRebuildConnectionOnCredentialsChange(
return;
}

final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
final var config = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings);
updateRemoteCluster(config, true, ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(RemoteClusterConnectionStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10));
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build();
service.updateRemoteClusterCredentials(() -> settings, listener);
service.updateRemoteClusterCredentials(() -> settings, this::buildLinkedProjectConfig, listener);
listener.actionGet(10, TimeUnit.SECONDS);
}

Expand All @@ -1641,6 +1641,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
service.updateRemoteClusterCredentials(
// Settings without credentials constitute credentials removal
() -> clusterSettings,
this::buildLinkedProjectConfig,
listener
);
listener.actionGet(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1730,7 +1731,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
.put(cluster2Settings)
.setSecureSettings(secureSettings)
.build();
service.updateRemoteClusterCredentials(() -> settings, listener);
service.updateRemoteClusterCredentials(() -> settings, this::buildLinkedProjectConfig, listener);
listener.actionGet(10, TimeUnit.SECONDS);
}

Expand All @@ -1750,6 +1751,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
service.updateRemoteClusterCredentials(
// Settings without credentials constitute credentials removal
() -> settings,
this::buildLinkedProjectConfig,
listener
);
listener.actionGet(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1828,6 +1830,12 @@ public void testLogsConnectionResult() throws IOException {
}
}

@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
private LinkedProjectConfig buildLinkedProjectConfig(String alias, Settings staticSettings, Settings newSettings) {
final var mergedSettings = Settings.builder().put(staticSettings, false).put(newSettings, false).build();
return RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, mergedSettings);
}

private void updateRemoteCluster(
RemoteClusterService service,
String alias,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.LinkedProjectConfig;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterSettings;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xpack.core.security.action.ActionTypes;
Expand Down Expand Up @@ -79,7 +84,17 @@ protected void doExecute(Task task, Request request, ActionListener<ActionRespon
final Settings transientSettings = clusterState.metadata().transientSettings();
return Settings.builder().put(request.getSettings(), true).put(persistentSettings, false).put(transientSettings, false).build();
};
remoteClusterService.updateRemoteClusterCredentials(settingsSupplier, listener.safeMap(ignored -> ActionResponse.Empty.INSTANCE));
@FixForMultiProject(description = "Supply the linked project ID when building the LinkedProjectConfig object.")
final TriFunction<String, Settings, Settings, LinkedProjectConfig> configBuilder = (clusterAlias, staticSettings, newSettings) -> {
final var projectId = remoteClusterService.getProjectResolver().getProjectId();
final var mergedSettings = Settings.builder().put(staticSettings, false).put(newSettings, false).build();
return RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings);
};
remoteClusterService.updateRemoteClusterCredentials(
settingsSupplier,
configBuilder,
listener.safeMap(ignored -> ActionResponse.Empty.INSTANCE)
);
}

private ClusterBlockException checkBlock(ClusterState clusterState) {
Expand Down