Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
616359b
Add LinkedProjectConfigService with ClusterSettings implementation
JeremyDahlgren Aug 29, 2025
2cc57b4
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 3, 2025
033fbbb
Add test for ClusterSettings impl, add javadoc
JeremyDahlgren Sep 3, 2025
26615d9
[CI] Auto commit changes from spotless
Sep 3, 2025
bc41d9a
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 4, 2025
ef7621a
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 4, 2025
1dfc156
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 4, 2025
b24de49
Eliminate using LinkedProjectConfigService in RemoteClusterAware
JeremyDahlgren Sep 5, 2025
b5d40ff
Add FixForMultiProject annotations in ClusterSettingsLinkedProjectCon…
JeremyDahlgren Sep 5, 2025
d461715
Move LinkedProjectConfigService.NOOP to test code
JeremyDahlgren Sep 6, 2025
0450941
Rename loadAllLinkedProjectConfigs() to getInitialLinkedProjectConfigs()
JeremyDahlgren Sep 6, 2025
20861a3
Remove redundant legacy alias check
JeremyDahlgren Sep 6, 2025
716d6b6
Add optional support for registering ClusterSettings update consumers
JeremyDahlgren Sep 6, 2025
d38a203
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 6, 2025
af28735
RemoteClusterAware implements LinkedProjectConfigListener
JeremyDahlgren Sep 9, 2025
e32fa12
Merge branch 'main' into es-12730-linked-proj-cfg-svc-cluster-settings
JeremyDahlgren Sep 9, 2025
68fa0fd
Remove conditional creation of linkedProjectConfigService
JeremyDahlgren Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders;
import org.elasticsearch.transport.ClusterSettingsLinkedProjectConfigService;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
Expand Down Expand Up @@ -969,6 +971,11 @@ public Map<String, String> queryFields() {

final IndexingPressure indexingLimits = new IndexingPressure(settings);

final var linkedProjectConfigService = pluginsService.loadSingletonServiceProvider(
LinkedProjectConfigService.class,
() -> new ClusterSettingsLinkedProjectConfigService(settings, clusterService.getClusterSettings(), projectResolver)
);

PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
clusterService,
Expand All @@ -992,7 +999,8 @@ public Map<String, String> queryFields() {
taskManager,
projectResolver,
slowLogFieldProvider,
indexingLimits
indexingLimits,
linkedProjectConfigService
);

Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
Expand Down Expand Up @@ -1122,6 +1130,7 @@ public Map<String, String> queryFields() {
taskManager,
telemetryProvider.getTracer(),
nodeEnvironment.nodeId(),
linkedProjectConfigService,
projectResolver
);
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ClusterConnectionManager;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -121,6 +122,7 @@ TransportService newTransportService(
TaskManager taskManager,
Tracer tracer,
String nodeId,
LinkedProjectConfigService linkedProjectConfigService,
ProjectResolver projectResolver
) {
return new TransportService(
Expand All @@ -132,6 +134,7 @@ TransportService newTransportService(
clusterSettings,
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
taskManager,
linkedProjectConfigService,
projectResolver
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;

Expand All @@ -57,5 +58,6 @@ public record PluginServiceInstances(
TaskManager taskManager,
ProjectResolver projectResolver,
SlowLogFieldProvider slowLogFieldProvider,
IndexingPressure indexingPressure
IndexingPressure indexingPressure,
LinkedProjectConfigService linkedProjectConfigService
) implements Plugin.PluginServices {}
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LinkedProjectConfigService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -192,6 +193,11 @@ public interface PluginServices {
* Provider for indexing pressure
*/
IndexingPressure indexingPressure();

/**
* A service for registering for linked project configuration updates.
*/
LinkedProjectConfigService linkedProjectConfigService();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* Abstract base class for {@link LinkedProjectConfigService} implementations.
* Provides common functionality for managing a list of registered listeners and notifying them of updates.
*/
public abstract class AbstractLinkedProjectConfigService implements LinkedProjectConfigService {
private final List<LinkedProjectConfigListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void register(LinkedProjectConfigListener listener) {
listeners.add(listener);
}

protected void handleUpdate(LinkedProjectConfig config) {
listeners.forEach(listener -> listener.updateLinkedProject(config));
}

protected void handleSkipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {
listeners.forEach(
listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

import java.util.Collection;
import java.util.List;

/**
* A {@link LinkedProjectConfigService} implementation that listens for {@link ClusterSettings} changes,
* creating {@link LinkedProjectConfig}s from the relevant settings and notifying registered listeners of updates.
*/
public class ClusterSettingsLinkedProjectConfigService extends AbstractLinkedProjectConfigService {
private final Settings settings;
private final ProjectResolver projectResolver;

@SuppressWarnings("this-escape")
public ClusterSettingsLinkedProjectConfigService(Settings settings, ClusterSettings clusterSettings, ProjectResolver projectResolver) {
this.settings = settings;
this.projectResolver = projectResolver;
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS,
RemoteClusterSettings.ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback);
clusterSettings.addAffixUpdateConsumer(
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
this::skipUnavailableChangedCallback,
(alias, value) -> {}
);
}

@Override
public Collection<LinkedProjectConfig> loadAllLinkedProjectConfigs() {
return RemoteClusterSettings.getRemoteClusters(settings)
.stream()
.map(alias -> RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, settings))
.toList();
}

private void settingsChangedCallback(String clusterAlias, Settings newSettings) {
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings);
handleUpdate(config);
}

private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) {
handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;

import java.util.Collection;
import java.util.Collections;

/**
* Service for registering {@link LinkedProjectConfigListener}s to be notified of changes to linked project configurations.
*/
public interface LinkedProjectConfigService {

/**
* Listener interface for receiving updates about linked project configurations.
* Implementations must not throw from any of the interface methods.
*/
interface LinkedProjectConfigListener {
/**
* Called when a linked project configuration has been added or updated.
*
* @param config The updated {@link LinkedProjectConfig}.
*/
void updateLinkedProject(LinkedProjectConfig config);

/**
* Called when the boolean skip_unavailable setting has changed for a linked project configuration.
* Note that skip_unavailable may not be supported in all contexts where linked projects are used.
*
* @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration.
* @param linkedProjectId The {@link ProjectId} of the linked project.
* @param linkedProjectAlias The alias used for the linked project.
* @param skipUnavailable The new value of the skip_unavailable setting.
*/
default void skipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {}
}

/**
* Registers a {@link LinkedProjectConfigListener} to receive updates about linked project configurations.
*
* @param listener The listener to register.
*/
void register(LinkedProjectConfigListener listener);

/**
* Loads all existing linked project configurations for all origin projects.
*
* @return A collection of all existing {@link LinkedProjectConfig}s.
*/
Collection<LinkedProjectConfig> loadAllLinkedProjectConfigs();

/**
* A no-op stub implementation of {@link LinkedProjectConfigService} intended for use in test scenarios where linked project
* configuration updates are not needed.
*/
LinkedProjectConfigService NOOP = new LinkedProjectConfigService() {
@Override
public void register(LinkedProjectConfigListener listener) {}

@Override
public Collection<LinkedProjectConfig> loadAllLinkedProjectConfigs() {
return Collections.emptyList();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,39 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.node.Node;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings;
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings;

/**
* Base class for all services and components that need up-to-date information about the registered remote clusters
*/
public abstract class RemoteClusterAware {
public abstract class RemoteClusterAware implements LinkedProjectConfigService.LinkedProjectConfigListener {
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";

protected final Settings settings;
private final LinkedProjectConfigService linkedProjectConfigService;
private final String nodeName;
private final boolean isRemoteClusterClientEnabled;

/**
* Creates a new {@link RemoteClusterAware} instance
* @param settings the nodes level settings
*/
protected RemoteClusterAware(Settings settings) {
protected RemoteClusterAware(Settings settings, LinkedProjectConfigService linkedProjectConfigService) {
this.settings = settings;
this.linkedProjectConfigService = linkedProjectConfigService;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.isRemoteClusterClientEnabled = DiscoveryNode.isRemoteClusterClient(settings);
}
Expand All @@ -57,10 +55,10 @@ protected String getNodeName() {
}

/**
* Returns remote clusters that are enabled in these settings
* Returns all known {@link LinkedProjectConfig}s.
*/
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
return RemoteClusterSettings.getRemoteClusters(settings);
protected Collection<LinkedProjectConfig> loadAllLinkedProjectConfigs() {
return linkedProjectConfigService.loadAllLinkedProjectConfigs();
}

/**
Expand Down Expand Up @@ -215,34 +213,11 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
return perClusterIndices;
}

void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
updateRemoteCluster(clusterAlias, settings);
}

/**
* Subclasses must implement this to receive information about updated cluster aliases.
*/
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);

/**
* Registers this instance to listen to updates on the cluster settings.
* Registers this instance to listen for linked project updates.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = List.of(
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategySettings.PROXY_ADDRESS,
ProxyConnectionStrategySettings.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategySettings.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
public void listenForUpdates() {
linkedProjectConfigService.register(this);
}

public static String buildRemoteIndexName(String clusterAlias, String indexName) {
Expand Down
Loading