Skip to content

Commit 5ea0f5d

Browse files
Add LinkedProjectConfigService with ClusterSettings implementation (#133834)
Adds a new interface LinkedProjectConfigService for registering LinkedProjectConfigListener instances to receive linked project configuration updates. This change introduces an abstract base class and a ClusterSettings based concrete class that provides the same functionality that was previously implemented in RemoteClusterAware and RemoteClusterService. Subclasses of RemoteClusterAware have been adjusted to receive LinkedProjectConfig updates instead of Settings updates. These changes allow for other linked project config implementations to be used in the system, freeing consumers of the updates from the details of the update mechanism used. Resolves: ES-12730
1 parent 7d0ee1a commit 5ea0f5d

File tree

36 files changed

+594
-229
lines changed

36 files changed

+594
-229
lines changed

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@
223223
import org.elasticsearch.threadpool.ExecutorBuilder;
224224
import org.elasticsearch.threadpool.ThreadPool;
225225
import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders;
226+
import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService;
227+
import org.elasticsearch.transport.LinkedProjectConfigService;
226228
import org.elasticsearch.transport.Transport;
227229
import org.elasticsearch.transport.TransportService;
228230
import org.elasticsearch.usage.UsageService;
@@ -969,6 +971,11 @@ public Map<String, String> queryFields() {
969971

970972
final IndexingPressure indexingLimits = new IndexingPressure(settings);
971973

974+
final var linkedProjectConfigService = pluginsService.loadSingletonServiceProvider(
975+
LinkedProjectConfigService.class,
976+
() -> new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver)
977+
);
978+
972979
PluginServiceInstances pluginServices = new PluginServiceInstances(
973980
client,
974981
clusterService,
@@ -992,7 +999,8 @@ public Map<String, String> queryFields() {
992999
taskManager,
9931000
projectResolver,
9941001
slowLogFieldProvider,
995-
indexingLimits
1002+
indexingLimits,
1003+
linkedProjectConfigService
9961004
);
9971005

9981006
Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
@@ -1122,6 +1130,7 @@ public Map<String, String> queryFields() {
11221130
taskManager,
11231131
telemetryProvider.getTracer(),
11241132
nodeEnvironment.nodeId(),
1133+
linkedProjectConfigService,
11251134
projectResolver
11261135
);
11271136
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.telemetry.tracing.Tracer;
4444
import org.elasticsearch.threadpool.ThreadPool;
4545
import org.elasticsearch.transport.ClusterConnectionManager;
46+
import org.elasticsearch.transport.LinkedProjectConfigService;
4647
import org.elasticsearch.transport.Transport;
4748
import org.elasticsearch.transport.TransportInterceptor;
4849
import org.elasticsearch.transport.TransportService;
@@ -121,6 +122,7 @@ TransportService newTransportService(
121122
TaskManager taskManager,
122123
Tracer tracer,
123124
String nodeId,
125+
LinkedProjectConfigService linkedProjectConfigService,
124126
ProjectResolver projectResolver
125127
) {
126128
return new TransportService(
@@ -132,6 +134,7 @@ TransportService newTransportService(
132134
clusterSettings,
133135
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
134136
taskManager,
137+
linkedProjectConfigService,
135138
projectResolver
136139
);
137140
}

server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.tasks.TaskManager;
3232
import org.elasticsearch.telemetry.TelemetryProvider;
3333
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.transport.LinkedProjectConfigService;
3435
import org.elasticsearch.watcher.ResourceWatcherService;
3536
import org.elasticsearch.xcontent.NamedXContentRegistry;
3637

@@ -57,5 +58,6 @@ public record PluginServiceInstances(
5758
TaskManager taskManager,
5859
ProjectResolver projectResolver,
5960
SlowLogFieldProvider slowLogFieldProvider,
60-
IndexingPressure indexingPressure
61+
IndexingPressure indexingPressure,
62+
LinkedProjectConfigService linkedProjectConfigService
6163
) implements Plugin.PluginServices {}

server/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.telemetry.TelemetryProvider;
4141
import org.elasticsearch.threadpool.ExecutorBuilder;
4242
import org.elasticsearch.threadpool.ThreadPool;
43+
import org.elasticsearch.transport.LinkedProjectConfigService;
4344
import org.elasticsearch.watcher.ResourceWatcherService;
4445
import org.elasticsearch.xcontent.NamedXContentRegistry;
4546
import org.elasticsearch.xcontent.XContentParser;
@@ -192,6 +193,11 @@ public interface PluginServices {
192193
* Provider for indexing pressure
193194
*/
194195
IndexingPressure indexingPressure();
196+
197+
/**
198+
* A service for registering for linked project configuration updates.
199+
*/
200+
LinkedProjectConfigService linkedProjectConfigService();
195201
}
196202

197203
/**
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
14+
import java.util.List;
15+
import java.util.concurrent.CopyOnWriteArrayList;
16+
17+
/**
18+
* Abstract base class for {@link LinkedProjectConfigService} implementations.
19+
* Provides common functionality for managing a list of registered listeners and notifying them of updates.
20+
*/
21+
public abstract class AbstractLinkedProjectConfigService implements LinkedProjectConfigService {
22+
private final List<LinkedProjectConfigListener> listeners = new CopyOnWriteArrayList<>();
23+
24+
@Override
25+
public void register(LinkedProjectConfigListener listener) {
26+
listeners.add(listener);
27+
}
28+
29+
protected void handleUpdate(LinkedProjectConfig config) {
30+
listeners.forEach(listener -> listener.updateLinkedProject(config));
31+
}
32+
33+
protected void handleSkipUnavailableChanged(
34+
ProjectId originProjectId,
35+
ProjectId linkedProjectId,
36+
String linkedProjectAlias,
37+
boolean skipUnavailable
38+
) {
39+
listeners.forEach(
40+
listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable)
41+
);
42+
}
43+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.project.ProjectResolver;
14+
import org.elasticsearch.common.settings.ClusterSettings;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.core.FixForMultiProject;
18+
import org.elasticsearch.core.Nullable;
19+
20+
import java.util.Collection;
21+
import java.util.List;
22+
23+
/**
24+
* A {@link LinkedProjectConfigService} implementation that listens for {@link ClusterSettings} changes,
25+
* creating {@link LinkedProjectConfig}s from the relevant settings and notifying registered listeners of updates.
26+
*/
27+
public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedProjectConfigService {
28+
private final Settings settings;
29+
private final ProjectResolver projectResolver;
30+
31+
/**
32+
* Constructs a new {@link ClusterSettingsLinkedProjectConfigService}.
33+
*
34+
* @param settings The initial node settings available on startup, used in {@link #getInitialLinkedProjectConfigs()}.
35+
* @param clusterSettings The {@link ClusterSettings} to add setting update consumers to, if non-null.
36+
* @param projectResolver The {@link ProjectResolver} to use to resolve the origin project ID.
37+
*/
38+
@SuppressWarnings("this-escape")
39+
public ClusterSettingsLinkedProjectConfigService(
40+
Settings settings,
41+
@Nullable ClusterSettings clusterSettings,
42+
ProjectResolver projectResolver
43+
) {
44+
this.settings = settings;
45+
this.projectResolver = projectResolver;
46+
if (clusterSettings != null) {
47+
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
48+
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
49+
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
50+
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
51+
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
52+
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
53+
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
54+
RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS,
55+
RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
56+
RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME
57+
);
58+
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback);
59+
clusterSettings.addAffixUpdateConsumer(
60+
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
61+
this::skipUnavailableChangedCallback,
62+
(alias, value) -> {}
63+
);
64+
}
65+
}
66+
67+
@Override
68+
@FixForMultiProject(description = "Refactor to add the linked project IDs associated with the aliases.")
69+
public Collection<LinkedProjectConfig> getInitialLinkedProjectConfigs() {
70+
return RemoteClusterSettings.getRemoteClusters(settings)
71+
.stream()
72+
.map(alias -> RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, settings))
73+
.toList();
74+
}
75+
76+
private void settingsChangedCallback(String clusterAlias, Settings newSettings) {
77+
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
78+
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
79+
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings);
80+
handleUpdate(config);
81+
}
82+
83+
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
84+
private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) {
85+
handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable);
86+
}
87+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
14+
import java.util.Collection;
15+
16+
/**
17+
* Service for registering {@link LinkedProjectConfigListener}s to be notified of changes to linked project configurations.
18+
*/
19+
public interface LinkedProjectConfigService {
20+
21+
/**
22+
* Listener interface for receiving updates about linked project configurations.
23+
* Implementations must not throw from any of the interface methods.
24+
*/
25+
interface LinkedProjectConfigListener {
26+
/**
27+
* Called when a linked project configuration has been added or updated.
28+
*
29+
* @param config The updated {@link LinkedProjectConfig}.
30+
*/
31+
void updateLinkedProject(LinkedProjectConfig config);
32+
33+
/**
34+
* Called when the boolean skip_unavailable setting has changed for a linked project configuration.
35+
* Note that skip_unavailable may not be supported in all contexts where linked projects are used.
36+
*
37+
* @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration.
38+
* @param linkedProjectId The {@link ProjectId} of the linked project.
39+
* @param linkedProjectAlias The alias used for the linked project.
40+
* @param skipUnavailable The new value of the skip_unavailable setting.
41+
*/
42+
default void skipUnavailableChanged(
43+
ProjectId originProjectId,
44+
ProjectId linkedProjectId,
45+
String linkedProjectAlias,
46+
boolean skipUnavailable
47+
) {}
48+
}
49+
50+
/**
51+
* Registers a {@link LinkedProjectConfigListener} to receive updates about linked project configurations.
52+
*
53+
* @param listener The listener to register.
54+
*/
55+
void register(LinkedProjectConfigListener listener);
56+
57+
/**
58+
* Loads all linked project configurations known at node startup, for all origin projects.
59+
*
60+
* @return A collection of all known {@link LinkedProjectConfig}s at node startup.
61+
*/
62+
Collection<LinkedProjectConfig> getInitialLinkedProjectConfigs();
63+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1414
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
1515
import org.elasticsearch.cluster.node.DiscoveryNode;
16-
import org.elasticsearch.common.settings.ClusterSettings;
17-
import org.elasticsearch.common.settings.Setting;
1816
import org.elasticsearch.common.settings.Settings;
1917
import org.elasticsearch.core.Strings;
2018
import org.elasticsearch.core.Tuple;
@@ -28,13 +26,10 @@
2826
import java.util.Set;
2927
import java.util.stream.Collectors;
3028

31-
import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings;
32-
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings;
33-
3429
/**
35-
* Base class for all services and components that need up-to-date information about the registered remote clusters
30+
* Base class for services and components that utilize linked projects.
3631
*/
37-
public abstract class RemoteClusterAware {
32+
public abstract class RemoteClusterAware implements LinkedProjectConfigService.LinkedProjectConfigListener {
3833
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
3934
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
4035

@@ -56,13 +51,6 @@ protected String getNodeName() {
5651
return nodeName;
5752
}
5853

59-
/**
60-
* Returns remote clusters that are enabled in these settings
61-
*/
62-
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
63-
return RemoteClusterSettings.getRemoteClusters(settings);
64-
}
65-
6654
/**
6755
* Check whether the index expression represents remote index or not.
6856
* The index name is assumed to be individual index (no commas) but can contain `-`, wildcards,
@@ -215,36 +203,6 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
215203
return perClusterIndices;
216204
}
217205

218-
void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
219-
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
220-
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
221-
}
222-
updateRemoteCluster(clusterAlias, settings);
223-
}
224-
225-
/**
226-
* Subclasses must implement this to receive information about updated cluster aliases.
227-
*/
228-
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);
229-
230-
/**
231-
* Registers this instance to listen to updates on the cluster settings.
232-
*/
233-
public void listenForUpdates(ClusterSettings clusterSettings) {
234-
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
235-
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
236-
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
237-
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
238-
SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
239-
SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
240-
SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
241-
ProxyConnectionStrategySettings.PROXY_ADDRESS,
242-
ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
243-
ProxyConnectionStrategySettings.SERVER_NAME
244-
);
245-
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
246-
}
247-
248206
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
249207
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
250208
? indexName

0 commit comments

Comments
 (0)