28
28
import org .elasticsearch .cluster .project .ProjectResolver ;
29
29
import org .elasticsearch .common .Strings ;
30
30
import org .elasticsearch .common .settings .ClusterSettings ;
31
- import org .elasticsearch .common .settings .SecureSetting ;
32
- import org .elasticsearch .common .settings .SecureString ;
33
- import org .elasticsearch .common .settings .Setting ;
34
31
import org .elasticsearch .common .settings .Settings ;
35
32
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
36
33
import org .elasticsearch .common .util .concurrent .EsExecutors ;
43
40
44
41
import java .io .Closeable ;
45
42
import java .io .IOException ;
46
- import java .util .Arrays ;
47
43
import java .util .HashMap ;
48
- import java .util .Iterator ;
49
44
import java .util .List ;
50
45
import java .util .Map ;
51
46
import java .util .Set ;
59
54
import java .util .stream .Collectors ;
60
55
import java .util .stream .Stream ;
61
56
62
- import static org .elasticsearch .common .settings .Setting .boolSetting ;
63
- import static org .elasticsearch .common .settings .Setting .enumSetting ;
64
- import static org .elasticsearch .common .settings .Setting .timeSetting ;
65
57
import static org .elasticsearch .transport .RemoteClusterPortSettings .REMOTE_CLUSTER_SERVER_ENABLED ;
66
58
67
59
/**
@@ -75,76 +67,6 @@ public final class RemoteClusterService extends RemoteClusterAware
75
67
76
68
private static final Logger logger = LogManager .getLogger (RemoteClusterService .class );
77
69
78
- /**
79
- * The initial connect timeout for remote cluster connections
80
- */
81
- public static final Setting <TimeValue > REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING = Setting .positiveTimeSetting (
82
- "cluster.remote.initial_connect_timeout" ,
83
- TimeValue .timeValueSeconds (30 ),
84
- Setting .Property .NodeScope
85
- );
86
-
87
- /**
88
- * The name of a node attribute to select nodes that should be connected to in the remote cluster.
89
- * For instance a node can be configured with {@code node.attr.gateway: true} in order to be eligible as a gateway node between
90
- * clusters. In that case {@code cluster.remote.node.attr: gateway} can be used to filter out other nodes in the remote cluster.
91
- * The value of the setting is expected to be a boolean, {@code true} for nodes that can become gateways, {@code false} otherwise.
92
- */
93
- public static final Setting <String > REMOTE_NODE_ATTRIBUTE = Setting .simpleString (
94
- "cluster.remote.node.attr" ,
95
- Setting .Property .NodeScope
96
- );
97
-
98
- public static final Setting .AffixSetting <Boolean > REMOTE_CLUSTER_SKIP_UNAVAILABLE = Setting .affixKeySetting (
99
- "cluster.remote." ,
100
- "skip_unavailable" ,
101
- (ns , key ) -> boolSetting (key , true , new RemoteConnectionEnabled <>(ns , key ), Setting .Property .Dynamic , Setting .Property .NodeScope )
102
- );
103
-
104
- public static final Setting .AffixSetting <TimeValue > REMOTE_CLUSTER_PING_SCHEDULE = Setting .affixKeySetting (
105
- "cluster.remote." ,
106
- "transport.ping_schedule" ,
107
- (ns , key ) -> timeSetting (
108
- key ,
109
- TransportSettings .PING_SCHEDULE ,
110
- new RemoteConnectionEnabled <>(ns , key ),
111
- Setting .Property .Dynamic ,
112
- Setting .Property .NodeScope
113
- )
114
- );
115
-
116
- public static final Setting .AffixSetting <Compression .Enabled > REMOTE_CLUSTER_COMPRESS = Setting .affixKeySetting (
117
- "cluster.remote." ,
118
- "transport.compress" ,
119
- (ns , key ) -> enumSetting (
120
- Compression .Enabled .class ,
121
- key ,
122
- TransportSettings .TRANSPORT_COMPRESS ,
123
- new RemoteConnectionEnabled <>(ns , key ),
124
- Setting .Property .Dynamic ,
125
- Setting .Property .NodeScope
126
- )
127
- );
128
-
129
- public static final Setting .AffixSetting <Compression .Scheme > REMOTE_CLUSTER_COMPRESSION_SCHEME = Setting .affixKeySetting (
130
- "cluster.remote." ,
131
- "transport.compression_scheme" ,
132
- (ns , key ) -> enumSetting (
133
- Compression .Scheme .class ,
134
- key ,
135
- TransportSettings .TRANSPORT_COMPRESSION_SCHEME ,
136
- new RemoteConnectionEnabled <>(ns , key ),
137
- Setting .Property .Dynamic ,
138
- Setting .Property .NodeScope
139
- )
140
- );
141
-
142
- public static final Setting .AffixSetting <SecureString > REMOTE_CLUSTER_CREDENTIALS = Setting .affixKeySetting (
143
- "cluster.remote." ,
144
- "credentials" ,
145
- key -> SecureSetting .secureString (key , null )
146
- );
147
-
148
70
public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake" ;
149
71
150
72
private final boolean isRemoteClusterClient ;
@@ -343,7 +265,11 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
343
265
@ Override
344
266
public void listenForUpdates (ClusterSettings clusterSettings ) {
345
267
super .listenForUpdates (clusterSettings );
346
- clusterSettings .addAffixUpdateConsumer (REMOTE_CLUSTER_SKIP_UNAVAILABLE , this ::updateSkipUnavailable , (alias , value ) -> {});
268
+ clusterSettings .addAffixUpdateConsumer (
269
+ RemoteClusterSettings .REMOTE_CLUSTER_SKIP_UNAVAILABLE ,
270
+ this ::updateSkipUnavailable ,
271
+ (alias , value ) -> {}
272
+ );
347
273
}
348
274
349
275
private synchronized void updateSkipUnavailable (String clusterAlias , Boolean skipUnavailable ) {
@@ -535,7 +461,7 @@ enum RemoteClusterConnectionStatus {
535
461
void initializeRemoteClusters () {
536
462
@ FixForMultiProject (description = "Refactor for initializing connections to linked projects for each origin project supported." )
537
463
final var projectId = projectResolver .getProjectId ();
538
- final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING .get (settings );
464
+ final TimeValue timeValue = RemoteClusterSettings . REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING .get (settings );
539
465
final PlainActionFuture <Void > future = new PlainActionFuture <>();
540
466
Set <String > enabledClusters = RemoteClusterAware .getEnabledRemoteClusters (settings );
541
467
@@ -630,9 +556,9 @@ public enum DisconnectedStrategy {
630
556
FAIL_IF_DISCONNECTED ,
631
557
632
558
/**
633
- * Behave according to the {@link #REMOTE_CLUSTER_SKIP_UNAVAILABLE} setting for this remote cluster: if this setting is
634
- * {@code false} (the default) then behave like {@link #RECONNECT_IF_DISCONNECTED}, but if it is {@code true} then behave like
635
- * {@link #FAIL_IF_DISCONNECTED}.
559
+ * Behave according to the {@link RemoteClusterSettings #REMOTE_CLUSTER_SKIP_UNAVAILABLE} setting for this remote cluster: if this
560
+ * setting is {@code false} (the default) then behave like {@link #RECONNECT_IF_DISCONNECTED}, but if it is {@code true} then behave
561
+ * like {@link #FAIL_IF_DISCONNECTED}.
636
562
*/
637
563
RECONNECT_UNLESS_SKIP_UNAVAILABLE
638
564
}
@@ -737,39 +663,4 @@ private Map<String, RemoteClusterConnection> getConnectionsMapForProject(Project
737
663
assert ProjectId .DEFAULT .equals (projectId ) : "Only the default project ID should be used when multiple projects are not supported" ;
738
664
return remoteClusters .get (projectId );
739
665
}
740
-
741
- private static class RemoteConnectionEnabled <T > implements Setting .Validator <T > {
742
-
743
- private final String clusterAlias ;
744
- private final String key ;
745
-
746
- private RemoteConnectionEnabled (String clusterAlias , String key ) {
747
- this .clusterAlias = clusterAlias ;
748
- this .key = key ;
749
- }
750
-
751
- @ Override
752
- public void validate (T value ) {}
753
-
754
- @ Override
755
- public void validate (T value , Map <Setting <?>, Object > settings , boolean isPresent ) {
756
- if (isPresent && RemoteConnectionStrategy .isConnectionEnabled (clusterAlias , settings ) == false ) {
757
- throw new IllegalArgumentException ("Cannot configure setting [" + key + "] if remote cluster is not enabled." );
758
- }
759
- }
760
-
761
- @ Override
762
- public Iterator <Setting <?>> settings () {
763
- return Stream .concat (
764
- Stream .of (RemoteConnectionStrategy .REMOTE_CONNECTION_MODE .getConcreteSettingForNamespace (clusterAlias )),
765
- settingsStream ()
766
- ).iterator ();
767
- }
768
-
769
- private Stream <Setting <?>> settingsStream () {
770
- return Arrays .stream (RemoteConnectionStrategy .ConnectionStrategy .values ())
771
- .flatMap (strategy -> strategy .getEnablementSettings ().get ())
772
- .map (as -> as .getConcreteSettingForNamespace (clusterAlias ));
773
- }
774
- };
775
666
}
0 commit comments