Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
616359b
Add LinkedProjectConfigService with ClusterSettings implementation
JeremyDahlgren Aug 29, 2025
2cc57b4
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 3, 2025
033fbbb
Add test for ClusterSettings impl, add javadoc
JeremyDahlgren Sep 3, 2025
26615d9
[CI] Auto commit changes from spotless
Sep 3, 2025
bc41d9a
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 4, 2025
ef7621a
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 4, 2025
1dfc156
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 4, 2025
b24de49
Eliminate using LinkedProjectConfigService in RemoteClusterAware
JeremyDahlgren Sep 5, 2025
b5d40ff
Add FixForMultiProject annotations in ClusterSettingsLinkedProjectCon…
JeremyDahlgren Sep 5, 2025
d461715
Move LinkedProjectConfigService.NOOP to test code
JeremyDahlgren Sep 6, 2025
0450941
Rename loadAllLinkedProjectConfigs() to getInitialLinkedProjectConfigs()
JeremyDahlgren Sep 6, 2025
20861a3
Remove redundant legacy alias check
JeremyDahlgren Sep 6, 2025
716d6b6
Add optional support for registering ClusterSettings update consumers
JeremyDahlgren Sep 6, 2025
d38a203
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 6, 2025
af28735
RemoteClusterAware implements LinkedProjectConfigListener
JeremyDahlgren Sep 9, 2025
e32fa12
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 9, 2025
68fa0fd
Remove conditional creation of linkedProjectConfigService
JeremyDahlgren Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders;
import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
Expand Down Expand Up @@ -969,6 +971,11 @@ public Map<String, String> queryFields() {

final IndexingPressure indexingLimits = new IndexingPressure(settings);

final var linkedProjectConfigService = pluginsService.loadSingletonServiceProvider(
LinkedProjectConfigService.class,
() -> new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver)
);

PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
clusterService,
Expand All @@ -992,7 +999,8 @@ public Map<String, String> queryFields() {
taskManager,
projectResolver,
slowLogFieldProvider,
indexingLimits
indexingLimits,
linkedProjectConfigService
);

Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
Expand Down Expand Up @@ -1122,6 +1130,7 @@ public Map<String, String> queryFields() {
taskManager,
telemetryProvider.getTracer(),
nodeEnvironment.nodeId(),
linkedProjectConfigService,
projectResolver
);
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ClusterConnectionManager;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -121,6 +122,7 @@ TransportService newTransportService(
TaskManager taskManager,
Tracer tracer,
String nodeId,
LinkedProjectConfigService linkedProjectConfigService,
ProjectResolver projectResolver
) {
return new TransportService(
Expand All @@ -132,6 +134,7 @@ TransportService newTransportService(
clusterSettings,
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
taskManager,
linkedProjectConfigService,
projectResolver
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;

Expand All @@ -57,5 +58,6 @@ public record PluginServiceInstances(
TaskManager taskManager,
ProjectResolver projectResolver,
SlowLogFieldProvider slowLogFieldProvider,
IndexingPressure indexingPressure
IndexingPressure indexingPressure,
LinkedProjectConfigService linkedProjectConfigService
) implements Plugin.PluginServices {}
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -192,6 +193,11 @@ public interface PluginServices {
* Provider for indexing pressure
*/
IndexingPressure indexingPressure();

/**
* A service for registering for linked project configuration updates.
*/
LinkedProjectConfigService linkedProjectConfigService();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* Abstract base class for {@link LinkedProjectConfigService} implementations.
* Provides common functionality for managing a list of registered listeners and notifying them of updates.
*/
public abstract class AbstractLinkedProjectConfigService implements LinkedProjectConfigService {
private final List<LinkedProjectConfigListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void register(LinkedProjectConfigListener listener) {
listeners.add(listener);
}

protected void handleUpdate(LinkedProjectConfig config) {
listeners.forEach(listener -> listener.updateLinkedProject(config));
}

protected void handleSkipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {
listeners.forEach(
listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;

import java.util.Collection;
import java.util.List;

/**
* A {@link LinkedProjectConfigService} implementation that listens for {@link ClusterSettings} changes,
* creating {@link LinkedProjectConfig}s from the relevant settings and notifying registered listeners of updates.
*/
public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedProjectConfigService {
private final Settings settings;
private final ProjectResolver projectResolver;

/**
* Constructs a new {@link ClusterSettingsLinkedProjectConfigService}.
*
* @param settings The initial node settings available on startup, used in {@link #getInitialLinkedProjectConfigs()}.
* @param clusterSettings The {@link ClusterSettings} to add setting update consumers to, if non-null.
* @param projectResolver The {@link ProjectResolver} to use to resolve the origin project ID.
*/
@SuppressWarnings("this-escape")
public ClusterSettingsLinkedProjectConfigService(
Settings settings,
@Nullable ClusterSettings clusterSettings,
ProjectResolver projectResolver
) {
this.settings = settings;
this.projectResolver = projectResolver;
if (clusterSettings != null) {
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS,
RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback);
clusterSettings.addAffixUpdateConsumer(
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
this::skipUnavailableChangedCallback,
(alias, value) -> {}
);
}
}

@Override
@FixForMultiProject(description = "Refactor to add the linked project IDs associated with the aliases.")
public Collection<LinkedProjectConfig> getInitialLinkedProjectConfigs() {
return RemoteClusterSettings.getRemoteClusters(settings)
.stream()
.map(alias -> RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, settings))
.toList();
}

private void settingsChangedCallback(String clusterAlias, Settings newSettings) {
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings);
handleUpdate(config);
}

@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) {
handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;

import java.util.Collection;

/**
* Service for registering {@link LinkedProjectConfigListener}s to be notified of changes to linked project configurations.
*/
public interface LinkedProjectConfigService {

/**
* Listener interface for receiving updates about linked project configurations.
* Implementations must not throw from any of the interface methods.
*/
interface LinkedProjectConfigListener {
/**
* Called when a linked project configuration has been added or updated.
*
* @param config The updated {@link LinkedProjectConfig}.
*/
void updateLinkedProject(LinkedProjectConfig config);

/**
* Called when the boolean skip_unavailable setting has changed for a linked project configuration.
* Note that skip_unavailable may not be supported in all contexts where linked projects are used.
*
* @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration.
* @param linkedProjectId The {@link ProjectId} of the linked project.
* @param linkedProjectAlias The alias used for the linked project.
* @param skipUnavailable The new value of the skip_unavailable setting.
*/
default void skipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {}
}

/**
* Registers a {@link LinkedProjectConfigListener} to receive updates about linked project configurations.
*
* @param listener The listener to register.
*/
void register(LinkedProjectConfigListener listener);

/**
* Loads all linked project configurations known at node startup, for all origin projects.
*
* @return A collection of all known {@link LinkedProjectConfig}s at node startup.
*/
Collection<LinkedProjectConfig> getInitialLinkedProjectConfigs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
Expand All @@ -28,11 +26,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings;
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings;

/**
* Base class for all services and components that need up-to-date information about the registered remote clusters
* Base class for services and components that utilize linked projects.
*/
public abstract class RemoteClusterAware {
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
Expand All @@ -56,13 +51,6 @@ protected String getNodeName() {
return nodeName;
}

/**
* Returns remote clusters that are enabled in these settings
*/
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
return RemoteClusterSettings.getRemoteClusters(settings);
}

/**
* Check whether the index expression represents remote index or not.
* The index name is assumed to be individual index (no commas) but can contain `-`, wildcards,
Expand Down Expand Up @@ -215,36 +203,6 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
return perClusterIndices;
}

void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
updateRemoteCluster(clusterAlias, settings);
}

/**
* Subclasses must implement this to receive information about updated cluster aliases.
*/
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);

/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategySettings.PROXY_ADDRESS,
ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategySettings.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}

public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
? indexName
Expand Down
Loading