Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
5251826
Consolidate settings consumers for RemoteClusterService
JeremyDahlgren Sep 30, 2025
23d3ec6
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Sep 30, 2025
be18795
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 7, 2025
ae572d6
Add UPDATED response, adjust logic per code review
JeremyDahlgren Oct 7, 2025
79ebe33
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 7, 2025
71646cd
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 8, 2025
69a272c
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 8, 2025
491f669
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 8, 2025
77400bc
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 8, 2025
80d0d0e
enhance test to check if connection object is same instance
JeremyDahlgren Oct 9, 2025
1650973
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 9, 2025
13dab04
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 9, 2025
617f3b6
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 9, 2025
f976d88
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 9, 2025
d795a86
Merge branch 'main' into es-12860-refactor-skip-unavailable-updates
JeremyDahlgren Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

Expand All @@ -29,15 +27,4 @@ public void register(LinkedProjectConfigListener listener) {
protected void handleUpdate(LinkedProjectConfig config) {
listeners.forEach(listener -> listener.updateLinkedProject(config));
}

protected void handleSkipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {
listeners.forEach(
listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ClusterSettingsLinkedProjectConfigService(
RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS,
RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterSettings.REMOTE_CONNECTION_MODE,
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS,
RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS,
Expand All @@ -56,11 +57,6 @@ public ClusterSettingsLinkedProjectConfigService(
RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME
);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback);
clusterSettings.addAffixUpdateConsumer(
RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
this::skipUnavailableChangedCallback,
(alias, value) -> {}
);
}
}

Expand All @@ -79,9 +75,4 @@ private void settingsChangedCallback(String clusterAlias, Settings newSettings)
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings);
handleUpdate(config);
}

@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) {
handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;

import java.util.Collection;

/**
Expand All @@ -29,22 +27,6 @@ interface LinkedProjectConfigListener {
* @param config The updated {@link LinkedProjectConfig}.
*/
void updateLinkedProject(LinkedProjectConfig config);

/**
* Called when the boolean skip_unavailable setting has changed for a linked project configuration.
* Note that skip_unavailable may not be supported in all contexts where linked projects are used.
*
* @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration.
* @param linkedProjectId The {@link ProjectId} of the linked project.
* @param linkedProjectAlias The alias used for the linked project.
* @param skipUnavailable The new value of the skip_unavailable setting.
*/
default void skipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,21 +293,6 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
return connection;
}

@Override
public void skipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {
assert crossProjectEnabled == false
: "Cannot configure setting [" + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getKey() + "] in CPS environments.";
final var remote = getConnectionsMapForProject(originProjectId).get(linkedProjectAlias);
if (remote != null) {
remote.setSkipUnavailable(skipUnavailable);
}
}

@Override
public void updateLinkedProject(LinkedProjectConfig config) {
final var projectId = config.originProjectId();
Expand Down Expand Up @@ -385,6 +370,9 @@ public synchronized void updateRemoteCluster(
remote = new RemoteClusterConnection(config, transportService, remoteClusterCredentialsManager, crossProjectEnabled);
connectionMap.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else if (remote.isSkipUnavailable() != config.skipUnavailable()) {
remote.setSkipUnavailable(config.skipUnavailable());
listener.onResponse(RemoteClusterConnectionStatus.UPDATED);
} else {
// No changes to connection configuration.
listener.onResponse(RemoteClusterConnectionStatus.UNCHANGED);
Comment on lines 377 to 378
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little odd that we report UNCHANGED when skipUnavailable changes. It might be useful to add a new enum such as UPDATED for updating properties on existing connection.

Expand All @@ -395,7 +383,8 @@ public enum RemoteClusterConnectionStatus {
CONNECTED,
DISCONNECTED,
RECONNECTED,
UNCHANGED
UNCHANGED,
UPDATED
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.transport;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.DefaultProjectResolver;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -25,35 +24,16 @@

public class ClusterSettingsLinkedProjectConfigServiceTests extends ESTestCase {

private record SkipUnavailableUpdate(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {}

private static class StubLinkedProjectConfigListener implements LinkedProjectConfigListener {
LinkedProjectConfig updatedConfig;
SkipUnavailableUpdate skipUnavailableUpdate;

@Override
public void updateLinkedProject(LinkedProjectConfig config) {
updatedConfig = config;
}

@Override
public void skipUnavailableChanged(
ProjectId originProjectId,
ProjectId linkedProjectId,
String linkedProjectAlias,
boolean skipUnavailable
) {
skipUnavailableUpdate = new SkipUnavailableUpdate(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable);
}

void reset() {
updatedConfig = null;
skipUnavailableUpdate = null;
}
}

Expand All @@ -66,17 +46,21 @@ public void testListenersReceiveUpdates() {
final var alias = randomAlphaOfLength(10);

final var initialProxyAddress = "localhost:9400";
final var initialSkipUnavailable = randomBoolean();
final var initialSettings = Settings.builder()
.put("cluster.remote." + alias + ".mode", "proxy")
.put("cluster.remote." + alias + ".proxy_address", initialProxyAddress)
.put("cluster.remote." + alias + ".skip_unavailable", initialSkipUnavailable)
.build();
final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(initialSettings);
final var service = new ClusterSettingsLinkedProjectConfigService(
initialSettings,
clusterSettings,
DefaultProjectResolver.INSTANCE
);
final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress).build();
final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress)
.skipUnavailable(initialSkipUnavailable)
.build();

// Verify we can get the linked projects on startup.
assertThat(service.getInitialLinkedProjectConfigs(), equalTo(List.of(config)));
Expand All @@ -92,48 +76,23 @@ public void testListenersReceiveUpdates() {
clusterSettings.applySettings(initialSettings);
for (int i = 0; i < numListeners; ++i) {
assertThat(listeners.get(i).updatedConfig, sameInstance(null));
assertThat(listeners.get(i).skipUnavailableUpdate, sameInstance(null));
listeners.get(i).reset();
}

// Change the skip_unavailable, leave the other settings alone, we should get the skip_unavailable update only.
var expectedSkipUnavailableUpdate = new SkipUnavailableUpdate(
config.originProjectId(),
config.linkedProjectId(),
config.linkedProjectAlias(),
config.skipUnavailable() == false
);
clusterSettings.applySettings(
Settings.builder()
.put(initialSettings)
.put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable)
.build()
);
for (int i = 0; i < numListeners; ++i) {
assertThat(listeners.get(i).updatedConfig, sameInstance(null));
assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate));
listeners.get(i).reset();
}

// Change the proxy address, and set skip_unavailable back to original value, we should get both updates.
expectedSkipUnavailableUpdate = new SkipUnavailableUpdate(
config.originProjectId(),
config.linkedProjectId(),
config.linkedProjectAlias(),
config.skipUnavailable()
);
// Change the proxy address and skip_unavailable values.
final var newProxyAddress = "localhost:9401";
final var newSkipUnavailable = config.skipUnavailable() == false;
clusterSettings.applySettings(
Settings.builder()
.put(initialSettings)
.put("cluster.remote." + alias + ".proxy_address", newProxyAddress)
.put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable)
.put("cluster.remote." + alias + ".skip_unavailable", newSkipUnavailable)
.build()
);
for (int i = 0; i < numListeners; ++i) {
assertNotNull("expected non-null updatedConfig for listener " + i, listeners.get(i).updatedConfig);
assertThat(listeners.get(i).updatedConfig.proxyAddress(), equalTo(newProxyAddress));
assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate));
assertThat(listeners.get(i).updatedConfig.skipUnavailable(), equalTo(newSkipUnavailable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static org.elasticsearch.test.MockLog.assertThatLogger;
import static org.elasticsearch.test.NodeRoles.masterOnlyNode;
Expand Down Expand Up @@ -112,6 +113,10 @@ private MockTransportService startTransport(
return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, transportVersion, threadPool, settings);
}

private MockTransportService startTransport(final String id) {
return startTransport(id, List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY);
}

public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
Expand Down Expand Up @@ -1877,6 +1882,61 @@ public void testLogsConnectionResult() throws IOException {
}
}

public void testSetSkipUnavailable() throws IOException {
final var skipUnavailableProperty = RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("remote")
.getKey();
final var seedNodeProperty = SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("remote").getKey();
final var clusterSettings = ClusterSettings.createBuiltInClusterSettings();

try (
var remote1Transport = startTransport("remote1");
var remote2Transport = startTransport("remote2");
var local = startTransport("local");
var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local)
) {
linkedProjectConfigService.register(remoteClusterService);

record SkipUnavailableTestConfig(
boolean skipUnavailable,
MockTransportService seedNodeTransportService,
boolean isNewConnection,
boolean rebuildExpected
) {}

final Consumer<SkipUnavailableTestConfig> applySettingsAndVerify = (cfg) -> {
final var currentConnection = cfg.isNewConnection ? null : remoteClusterService.getRemoteClusterConnection("remote");
clusterSettings.applySettings(
Settings.builder()
.put(skipUnavailableProperty, cfg.skipUnavailable)
.put(seedNodeProperty, cfg.seedNodeTransportService.getLocalNode().getAddress().toString())
.build()
);
assertTrue(isRemoteClusterRegistered(remoteClusterService, "remote"));
assertEquals(cfg.skipUnavailable, remoteClusterService.isSkipUnavailable("remote").orElseThrow());
if (cfg.rebuildExpected) {
assertNotSame(currentConnection, remoteClusterService.getRemoteClusterConnection("remote"));
} else if (cfg.isNewConnection == false) {
assertSame(currentConnection, remoteClusterService.getRemoteClusterConnection("remote"));
}
};

// Apply the initial settings and verify the new connection is built.
var skipUnavailable = randomBoolean();
applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote1Transport, true, true));

// Change skip_unavailable value, but not seed node, connection should not be rebuilt, but skip_unavailable should be modified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to somehow assert that the connection is the same instance, i.e. no rebuilding.

skipUnavailable = skipUnavailable == false;
applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote1Transport, false, false));

// Change the seed node but not skip_unavailable, connection should be rebuilt and skip_unavailable should stay the same.
applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote2Transport, false, true));

// Change skip_unavailable value and the seed node, connection should be rebuilt and skip_unavailable should also be modified.
skipUnavailable = skipUnavailable == false;
applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote1Transport, false, true));
}
}

@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
private LinkedProjectConfig buildLinkedProjectConfig(String alias, Settings staticSettings, Settings newSettings) {
final var mergedSettings = Settings.builder().put(staticSettings, false).put(newSettings, false).build();
Expand Down