27
27
import org .elasticsearch .cluster .project .DefaultProjectResolver ;
28
28
import org .elasticsearch .cluster .project .ProjectResolver ;
29
29
import org .elasticsearch .common .Strings ;
30
+ import org .elasticsearch .common .collect .Iterators ;
30
31
import org .elasticsearch .common .settings .ClusterSettings ;
31
32
import org .elasticsearch .common .settings .SecureSetting ;
32
33
import org .elasticsearch .common .settings .SecureString ;
48
49
import java .util .Iterator ;
49
50
import java .util .List ;
50
51
import java .util .Map ;
52
+ import java .util .Objects ;
51
53
import java .util .Set ;
52
54
import java .util .concurrent .CountDownLatch ;
53
55
import java .util .concurrent .Executor ;
@@ -98,7 +100,13 @@ public final class RemoteClusterService extends RemoteClusterAware
98
100
public static final Setting .AffixSetting <Boolean > REMOTE_CLUSTER_SKIP_UNAVAILABLE = Setting .affixKeySetting (
99
101
"cluster.remote." ,
100
102
"skip_unavailable" ,
101
- (ns , key ) -> boolSetting (key , true , new RemoteConnectionEnabled <>(ns , key ), Setting .Property .Dynamic , Setting .Property .NodeScope )
103
+ (ns , key ) -> boolSetting (
104
+ key ,
105
+ true ,
106
+ new FixedValueIfStatelessEnabledValidator <>(ns , key , true ),
107
+ Setting .Property .Dynamic ,
108
+ Setting .Property .NodeScope
109
+ )
102
110
);
103
111
104
112
public static final Setting .AffixSetting <TimeValue > REMOTE_CLUSTER_PING_SCHEDULE = Setting .affixKeySetting (
@@ -149,6 +157,7 @@ public final class RemoteClusterService extends RemoteClusterAware
149
157
150
158
private final boolean enabled ;
151
159
private final boolean remoteClusterServerEnabled ;
160
+ private final boolean isStateless ;
152
161
153
162
public boolean isEnabled () {
154
163
return enabled ;
@@ -167,6 +176,7 @@ public boolean isRemoteClusterServerEnabled() {
167
176
RemoteClusterService (Settings settings , TransportService transportService ) {
168
177
super (settings );
169
178
this .enabled = DiscoveryNode .isRemoteClusterClient (settings );
179
+ this .isStateless = DiscoveryNode .isStateless (settings );
170
180
this .remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED .get (settings );
171
181
this .transportService = transportService ;
172
182
this .projectResolver = DefaultProjectResolver .INSTANCE ;
@@ -294,7 +304,7 @@ void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
294
304
* Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable
295
305
*/
296
306
public boolean isSkipUnavailable (String clusterAlias ) {
297
- return getRemoteClusterConnection (clusterAlias ).isSkipUnavailable ();
307
+ return isStateless || getRemoteClusterConnection (clusterAlias ).isSkipUnavailable ();
298
308
}
299
309
300
310
public Transport .Connection getConnection (String cluster ) {
@@ -351,10 +361,13 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
351
361
@ Override
352
362
public void listenForUpdates (ClusterSettings clusterSettings ) {
353
363
super .listenForUpdates (clusterSettings );
354
- clusterSettings .addAffixUpdateConsumer (REMOTE_CLUSTER_SKIP_UNAVAILABLE , this ::updateSkipUnavailable , (alias , value ) -> {});
364
+ if (isStateless == false ) {
365
+ clusterSettings .addAffixUpdateConsumer (REMOTE_CLUSTER_SKIP_UNAVAILABLE , this ::updateSkipUnavailable , (alias , value ) -> {});
366
+ }
355
367
}
356
368
357
369
private synchronized void updateSkipUnavailable (String clusterAlias , Boolean skipUnavailable ) {
370
+ assert isStateless == false : "Cannot configure setting [" + REMOTE_CLUSTER_SKIP_UNAVAILABLE + "] in stateless environments." ;
358
371
RemoteClusterConnection remote = getConnectionsMapForCurrentProject ().get (clusterAlias );
359
372
if (remote != null ) {
360
373
remote .setSkipUnavailable (skipUnavailable );
@@ -667,6 +680,13 @@ public RemoteClusterClient getRemoteClusterClient(
667
680
"this node does not have the " + DiscoveryNodeRole .REMOTE_CLUSTER_CLIENT_ROLE .roleName () + " role"
668
681
);
669
682
}
683
+ if (isStateless && disconnectedStrategy == DisconnectedStrategy .RECONNECT_UNLESS_SKIP_UNAVAILABLE ) {
684
+ throw new IllegalArgumentException (
685
+ "DisconnectedStrategy ["
686
+ + DisconnectedStrategy .RECONNECT_UNLESS_SKIP_UNAVAILABLE
687
+ + "] is not supported in stateless environments"
688
+ );
689
+ }
670
690
if (transportService .getRemoteClusterService ().getRegisteredRemoteClusterNames ().contains (clusterAlias ) == false ) {
671
691
throw new NoSuchRemoteClusterException (clusterAlias );
672
692
}
@@ -736,6 +756,10 @@ private RemoteConnectionEnabled(String clusterAlias, String key) {
736
756
this .key = key ;
737
757
}
738
758
759
+ protected String getKey () {
760
+ return key ;
761
+ }
762
+
739
763
@ Override
740
764
public void validate (T value ) {}
741
765
@@ -760,4 +784,29 @@ private Stream<Setting<?>> settingsStream() {
760
784
.map (as -> as .getConcreteSettingForNamespace (clusterAlias ));
761
785
}
762
786
};
787
+
788
+ private static class FixedValueIfStatelessEnabledValidator <T > extends RemoteConnectionEnabled <T > {
789
+ private final Setting <Boolean > statelessSetting = Setting .boolSetting (DiscoveryNode .STATELESS_ENABLED_SETTING_NAME , false );
790
+ private final T requiredValue ;
791
+
792
+ private FixedValueIfStatelessEnabledValidator (String clusterAlias , String key , T requiredValue ) {
793
+ super (clusterAlias , key );
794
+ this .requiredValue = Objects .requireNonNull (requiredValue );
795
+ }
796
+
797
+ @ Override
798
+ public void validate (T value , Map <Setting <?>, Object > settings , boolean isPresent ) {
799
+ if (isPresent && ((Boolean ) settings .get (statelessSetting )) && requiredValue .equals (value ) == false ) {
800
+ throw new IllegalArgumentException (
801
+ "setting [" + getKey () + "] must be set to [" + requiredValue + "] when stateless is enabled"
802
+ );
803
+ }
804
+ super .validate (value , settings , isPresent );
805
+ }
806
+
807
+ @ Override
808
+ public Iterator <Setting <?>> settings () {
809
+ return Iterators .concat (super .settings (), List .of (statelessSetting ).iterator ());
810
+ }
811
+ }
763
812
}
0 commit comments