Skip to content

Commit 5510b9d

Browse files
Update RemoteClusterService to default skip unavailable to true for CPS
In stateless CPS skip_unavailable will not be used and should always be true. Also DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE is rejected in CPS as a parameter value for getRemoteClusterClient(). Resolves: ES-12267
1 parent 75e1518 commit 5510b9d

File tree

3 files changed

+42
-4
lines changed

3 files changed

+42
-4
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public final class RemoteClusterConnection implements Closeable {
5252
private final ThreadPool threadPool;
5353
private volatile boolean skipUnavailable;
5454
private final TimeValue initialConnectionTimeout;
55+
private final boolean isStateless;
5556

5657
/**
5758
* Creates a new {@link RemoteClusterConnection}
@@ -83,8 +84,9 @@ public final class RemoteClusterConnection implements Closeable {
8384
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
8485
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
8586
this.remoteConnectionManager.addListener(transportService);
86-
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(clusterAlias)
87-
.get(settings);
87+
this.isStateless = DiscoveryNode.isStateless(settings);
88+
this.skipUnavailable = isStateless
89+
|| RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(clusterAlias).get(settings);
8890
this.threadPool = transportService.threadPool;
8991
initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
9092
}
@@ -93,6 +95,7 @@ public final class RemoteClusterConnection implements Closeable {
9395
* Updates the skipUnavailable flag that can be dynamically set for each remote cluster
9496
*/
9597
void setSkipUnavailable(boolean skipUnavailable) {
98+
assert isStateless == false : "Cannot set skipUnavailable in stateless environments.";
9699
this.skipUnavailable = skipUnavailable;
97100
}
98101

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public final class RemoteClusterService extends RemoteClusterAware
149149

150150
private final boolean enabled;
151151
private final boolean remoteClusterServerEnabled;
152+
private final boolean isStateless;
152153

153154
public boolean isEnabled() {
154155
return enabled;
@@ -167,6 +168,7 @@ public boolean isRemoteClusterServerEnabled() {
167168
RemoteClusterService(Settings settings, TransportService transportService) {
168169
super(settings);
169170
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
171+
this.isStateless = DiscoveryNode.isStateless(settings);
170172
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
171173
this.transportService = transportService;
172174
this.projectResolver = DefaultProjectResolver.INSTANCE;
@@ -294,7 +296,7 @@ void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
294296
* Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable
295297
*/
296298
public boolean isSkipUnavailable(String clusterAlias) {
297-
return getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
299+
return isStateless || getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
298300
}
299301

300302
public Transport.Connection getConnection(String cluster) {
@@ -351,10 +353,13 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
351353
@Override
352354
public void listenForUpdates(ClusterSettings clusterSettings) {
353355
super.listenForUpdates(clusterSettings);
354-
clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {});
356+
if (isStateless == false) {
357+
clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {});
358+
}
355359
}
356360

357361
private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
362+
assert isStateless == false : "Cannot configure setting [" + REMOTE_CLUSTER_SKIP_UNAVAILABLE + "] in stateless environments.";
358363
RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias);
359364
if (remote != null) {
360365
remote.setSkipUnavailable(skipUnavailable);
@@ -667,6 +672,13 @@ public RemoteClusterClient getRemoteClusterClient(
667672
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
668673
);
669674
}
675+
if (isStateless && disconnectedStrategy == DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE) {
676+
throw new IllegalArgumentException(
677+
"DisconnectedStrategy ["
678+
+ DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
679+
+ "] is not supported in stateless environments"
680+
);
681+
}
670682
if (transportService.getRemoteClusterService().getRegisteredRemoteClusterNames().contains(clusterAlias) == false) {
671683
throw new NoSuchRemoteClusterException(clusterAlias);
672684
}

server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,29 @@ public void testRemoteClusterServiceNotEnabled() {
240240
}
241241
}
242242

243+
public void testGetRemoteClusterClientRejectsSkipUnavailableInStateless() {
244+
final var settings = Settings.builder().put("stateless.enabled", true).putList("node.roles", "remote_cluster_client").build();
245+
try (
246+
MockTransportService service = MockTransportService.createNewService(
247+
settings,
248+
VersionInformation.CURRENT,
249+
TransportVersion.current(),
250+
threadPool,
251+
null
252+
)
253+
) {
254+
final var remoteClusterService = service.getRemoteClusterService();
255+
expectThrows(
256+
IllegalArgumentException.class,
257+
() -> remoteClusterService.getRemoteClusterClient(
258+
"test",
259+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
260+
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
261+
)
262+
);
263+
}
264+
}
265+
243266
public void testQuicklySkipUnavailableClusters() throws Exception {
244267
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
245268
try (

0 commit comments

Comments
 (0)