-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Refactor RemoteClusterService to be multi-project aware #131894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
4ba097b
c553912
7c65264
3615f65
0e75dde
ff1ca79
6acd58e
7d17547
8e44557
ac99eb3
5cb6d07
8264455
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,11 @@ | |
import org.elasticsearch.action.support.RefCountingRunnable; | ||
import org.elasticsearch.client.internal.RemoteClusterClient; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.ProjectId; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodeRole; | ||
import org.elasticsearch.cluster.project.DefaultProjectResolver; | ||
import org.elasticsearch.cluster.project.ProjectResolver; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.common.settings.SecureSetting; | ||
|
@@ -31,6 +34,7 @@ | |
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
import org.elasticsearch.core.FixForMultiProject; | ||
import org.elasticsearch.core.IOUtils; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.indices.IndicesExpressionGrouper; | ||
|
@@ -52,6 +56,7 @@ | |
import java.util.function.BiFunction; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import static org.elasticsearch.common.settings.Setting.boolSetting; | ||
|
@@ -154,14 +159,20 @@ public boolean isRemoteClusterServerEnabled() { | |
} | ||
|
||
private final TransportService transportService; | ||
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap(); | ||
private final Map<ProjectId, Map<String, RemoteClusterConnection>> remoteClusters; | ||
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager; | ||
private final ProjectResolver projectResolver; | ||
|
||
@FixForMultiProject(description = "Inject the ProjectResolver instance.") | ||
RemoteClusterService(Settings settings, TransportService transportService) { | ||
super(settings); | ||
this.enabled = DiscoveryNode.isRemoteClusterClient(settings); | ||
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings); | ||
this.transportService = transportService; | ||
this.projectResolver = DefaultProjectResolver.INSTANCE; | ||
this.remoteClusters = projectResolver.supportsMultipleProjects() | ||
? ConcurrentCollections.newConcurrentMap() | ||
: Map.of(ProjectId.DEFAULT, ConcurrentCollections.newConcurrentMap()); | ||
this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings); | ||
if (remoteClusterServerEnabled) { | ||
registerRemoteClusterHandshakeRequestHandler(transportService); | ||
|
@@ -250,8 +261,9 @@ public Set<String> getConfiguredClusters() { | |
/** | ||
* Returns the registered remote cluster names. | ||
*/ | ||
@FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.") | ||
public Set<String> getRegisteredRemoteClusterNames() { | ||
return remoteClusters.keySet(); | ||
return getConnectionsMapForCurrentProject().keySet(); | ||
} | ||
|
||
/** | ||
|
@@ -328,7 +340,8 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) { | |
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role" | ||
); | ||
} | ||
RemoteClusterConnection connection = remoteClusters.get(cluster); | ||
@FixForMultiProject(description = "Verify all callers will have the proper context set for resolving the origin project ID.") | ||
RemoteClusterConnection connection = getConnectionsMapForCurrentProject().get(cluster); | ||
if (connection == null) { | ||
throw new NoSuchRemoteClusterException(cluster); | ||
} | ||
|
@@ -342,48 +355,63 @@ public void listenForUpdates(ClusterSettings clusterSettings) { | |
} | ||
|
||
private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { | ||
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); | ||
RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias); | ||
if (remote != null) { | ||
remote.setSkipUnavailable(skipUnavailable); | ||
} | ||
} | ||
|
||
@FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.") | ||
public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) { | ||
final var projectId = projectResolver.getProjectId(); | ||
final Settings settings = settingsSupplier.get(); | ||
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings); | ||
// We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential | ||
// value was updated. Therefore, only consider added or removed aliases | ||
final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size(); | ||
if (totalConnectionsToRebuild == 0) { | ||
logger.debug("no connection rebuilding required after credentials update"); | ||
logger.debug("project ID [{}] no connection rebuilding required after credentials update", projectId); | ||
JeremyDahlgren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
listener.onResponse(null); | ||
return; | ||
} | ||
logger.info("rebuilding [{}] connections after credentials update", totalConnectionsToRebuild); | ||
logger.info("project ID [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild); | ||
JeremyDahlgren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { | ||
for (var clusterAlias : result.addedClusterAliases()) { | ||
maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs); | ||
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs); | ||
} | ||
for (var clusterAlias : result.removedClusterAliases()) { | ||
maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs); | ||
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs); | ||
} | ||
} | ||
} | ||
|
||
// package-private for testing | ||
|
||
private void maybeRebuildConnectionOnCredentialsChange(String clusterAlias, Settings settings, RefCountingRunnable connectionRefs) { | ||
if (false == remoteClusters.containsKey(clusterAlias)) { | ||
private void maybeRebuildConnectionOnCredentialsChange( | ||
ProjectId projectId, | ||
String clusterAlias, | ||
Settings settings, | ||
RefCountingRunnable connectionRefs | ||
) { | ||
final var connectionsMap = getConnectionsMapForProject(projectId); | ||
if (false == connectionsMap.containsKey(clusterAlias)) { | ||
// A credential was added or removed before a remote connection was configured. | ||
// Without an existing connection, there is nothing to rebuild. | ||
logger.info("no connection rebuild required for remote cluster [{}] after credentials change", clusterAlias); | ||
logger.info( | ||
"project [{}] no connection rebuild required for remote cluster [{}] after credentials change", | ||
projectId, | ||
clusterAlias | ||
); | ||
return; | ||
} | ||
|
||
updateRemoteCluster(clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { | ||
updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { | ||
@Override | ||
public void onResponse(RemoteClusterConnectionStatus status) { | ||
logger.info("remote cluster connection [{}] updated after credentials change: [{}]", clusterAlias, status); | ||
logger.info( | ||
"project [{}] remote cluster connection [{}] updated after credentials change: [{}]", | ||
projectId, | ||
clusterAlias, | ||
status | ||
); | ||
} | ||
|
||
@Override | ||
|
@@ -392,23 +420,32 @@ public void onFailure(Exception e) { | |
// does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call. | ||
// Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of | ||
// returning an error) | ||
logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "] after credentials change", e); | ||
logger.warn( | ||
() -> "project [" | ||
+ projectId | ||
+ "] failed to update remote cluster connection [" | ||
+ clusterAlias | ||
+ "] after credentials change", | ||
e | ||
); | ||
} | ||
}, connectionRefs.acquire())); | ||
} | ||
|
||
@Override | ||
protected void updateRemoteCluster(String clusterAlias, Settings settings) { | ||
@FixForMultiProject(description = "ES-12270: Refactor as needed to support project specific changes to linked remotes.") | ||
final var projectId = projectResolver.getProjectId(); | ||
CountDownLatch latch = new CountDownLatch(1); | ||
updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() { | ||
updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() { | ||
@Override | ||
public void onResponse(RemoteClusterConnectionStatus status) { | ||
logger.info("remote cluster connection [{}] updated: {}", clusterAlias, status); | ||
logger.info("project [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "]", e); | ||
logger.warn(() -> "project [" + projectId + " failed to update remote cluster connection [" + clusterAlias + "]", e); | ||
} | ||
}, latch::countDown)); | ||
|
||
|
@@ -417,25 +454,26 @@ public void onFailure(Exception e) { | |
// are on the cluster state thread and our custom future implementation will throw an | ||
// assertion. | ||
if (latch.await(10, TimeUnit.SECONDS) == false) { | ||
logger.warn("failed to update remote cluster connection [{}] within {}", clusterAlias, TimeValue.timeValueSeconds(10)); | ||
logger.warn( | ||
"project [{}] failed to update remote cluster connection [{}] within {}", | ||
projectId, | ||
clusterAlias, | ||
TimeValue.timeValueSeconds(10) | ||
); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
/** | ||
* This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure | ||
* | ||
* @param clusterAlias a cluster alias to discovery node mapping representing the remote clusters seeds nodes | ||
* @param newSettings the updated settings for the remote connection | ||
* @param listener a listener invoked once every configured cluster has been connected to | ||
*/ | ||
Comment on lines
-427
to
-433
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we salvage javadocs for the parameters to the overloaded method below it, i.e. this method private synchronized void updateRemoteCluster(
ProjectId projectId,
String clusterAlias,
Settings newSettings,
boolean forceRebuild,
ActionListener<RemoteClusterConnectionStatus> listener
) ? |
||
// Package-access for testing. | ||
@FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.") | ||
void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener<RemoteClusterConnectionStatus> listener) { | ||
Comment on lines
+470
to
471
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, this method only has test usages after this PR. So yeah I'd prefer to remove it eventually. |
||
updateRemoteCluster(clusterAlias, newSettings, false, listener); | ||
updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener); | ||
JeremyDahlgren marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private synchronized void updateRemoteCluster( | ||
ProjectId projectId, | ||
String clusterAlias, | ||
Settings newSettings, | ||
boolean forceRebuild, | ||
|
@@ -445,14 +483,15 @@ private synchronized void updateRemoteCluster( | |
throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); | ||
} | ||
|
||
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); | ||
final var connectionMap = getConnectionsMapForProject(projectId); | ||
RemoteClusterConnection remote = connectionMap.get(clusterAlias); | ||
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, newSettings) == false) { | ||
try { | ||
IOUtils.close(remote); | ||
} catch (IOException e) { | ||
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); | ||
logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e); | ||
} | ||
remoteClusters.remove(clusterAlias); | ||
connectionMap.remove(clusterAlias); | ||
listener.onResponse(RemoteClusterConnectionStatus.DISCONNECTED); | ||
return; | ||
} | ||
|
@@ -461,19 +500,19 @@ private synchronized void updateRemoteCluster( | |
// this is a new cluster we have to add a new representation | ||
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); | ||
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager); | ||
remoteClusters.put(clusterAlias, remote); | ||
connectionMap.put(clusterAlias, remote); | ||
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED)); | ||
} else if (forceRebuild || remote.shouldRebuildConnection(newSettings)) { | ||
// Changes to connection configuration. Must tear down existing connection | ||
try { | ||
IOUtils.close(remote); | ||
} catch (IOException e) { | ||
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); | ||
logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e); | ||
} | ||
remoteClusters.remove(clusterAlias); | ||
connectionMap.remove(clusterAlias); | ||
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); | ||
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager); | ||
remoteClusters.put(clusterAlias, remote); | ||
connectionMap.put(clusterAlias, remote); | ||
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED)); | ||
} else { | ||
// No changes to connection configuration. | ||
|
@@ -493,6 +532,8 @@ enum RemoteClusterConnectionStatus { | |
* to all configured seed nodes. | ||
*/ | ||
void initializeRemoteClusters() { | ||
@FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.") | ||
final var projectId = projectResolver.getProjectId(); | ||
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); | ||
final PlainActionFuture<Void> future = new PlainActionFuture<>(); | ||
Set<String> enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); | ||
|
@@ -503,7 +544,7 @@ void initializeRemoteClusters() { | |
|
||
CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future); | ||
for (String clusterAlias : enabledClusters) { | ||
updateRemoteCluster(clusterAlias, settings, listener.map(ignored -> null)); | ||
updateRemoteCluster(projectId, clusterAlias, settings, false, listener.map(ignored -> null)); | ||
} | ||
|
||
if (enabledClusters.isEmpty()) { | ||
|
@@ -515,19 +556,20 @@ void initializeRemoteClusters() { | |
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} catch (TimeoutException ex) { | ||
logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); | ||
logger.warn("project [{}] failed to connect to remote clusters within {}", projectId, timeValue.toString()); | ||
} catch (Exception e) { | ||
logger.warn("failed to connect to remote clusters", e); | ||
logger.warn("project [" + projectId + "] failed to connect to remote clusters", e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
IOUtils.close(remoteClusters.values()); | ||
IOUtils.close(remoteClusters.values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList())); | ||
} | ||
|
||
@FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.") | ||
public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() { | ||
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); | ||
return getConnectionsMapForCurrentProject().values().stream().map(RemoteClusterConnection::getConnectionInfo); | ||
} | ||
|
||
@Override | ||
|
@@ -549,9 +591,11 @@ public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String, | |
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role" | ||
); | ||
} | ||
@FixForMultiProject(description = "Analyze usages and determine if the project ID must be provided.") | ||
final var projectConnectionsMap = getConnectionsMapForCurrentProject(); | ||
final var connectionsMap = new HashMap<String, RemoteClusterConnection>(); | ||
for (String cluster : clusters) { | ||
final var connection = this.remoteClusters.get(cluster); | ||
final var connection = projectConnectionsMap.get(cluster); | ||
if (connection == null) { | ||
listener.onFailure(new NoSuchRemoteClusterException(cluster)); | ||
return; | ||
|
@@ -654,6 +698,25 @@ static void registerRemoteClusterHandshakeRequestHandler(TransportService transp | |
); | ||
} | ||
|
||
/** | ||
* Returns the map of connections for the {@link ProjectId} currently returned by the {@link ProjectResolver}. | ||
*/ | ||
private Map<String, RemoteClusterConnection> getConnectionsMapForCurrentProject() { | ||
return getConnectionsMapForProject(projectResolver.getProjectId()); | ||
} | ||
|
||
/** | ||
* Returns the map of connections for the given {@link ProjectId}. | ||
*/ | ||
private Map<String, RemoteClusterConnection> getConnectionsMapForProject(ProjectId projectId) { | ||
if (projectResolver.supportsMultipleProjects()) { | ||
assert ProjectId.DEFAULT.equals(projectId) == false : "The default project ID should not be used in multi-project environment"; | ||
return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap()); | ||
} | ||
assert ProjectId.DEFAULT.equals(projectId) : "Only the default project ID should be used when multiple projects are not supported"; | ||
return remoteClusters.get(projectId); | ||
} | ||
|
||
private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> { | ||
|
||
private final String clusterAlias; | ||
|
Uh oh!
There was an error while loading. Please reload this page.