17
17
import org .elasticsearch .action .support .ActionTestUtils ;
18
18
import org .elasticsearch .action .support .IndicesOptions ;
19
19
import org .elasticsearch .action .support .PlainActionFuture ;
20
+ import org .elasticsearch .cluster .metadata .ProjectId ;
20
21
import org .elasticsearch .cluster .node .DiscoveryNode ;
21
22
import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
22
23
import org .elasticsearch .cluster .node .VersionInformation ;
23
24
import org .elasticsearch .cluster .project .DefaultProjectResolver ;
25
+ import org .elasticsearch .cluster .project .ProjectResolver ;
24
26
import org .elasticsearch .common .Strings ;
25
27
import org .elasticsearch .common .settings .AbstractScopedSettings ;
26
28
import org .elasticsearch .common .settings .ClusterSettings ;
27
29
import org .elasticsearch .common .settings .MockSecureSettings ;
28
30
import org .elasticsearch .common .settings .Settings ;
29
31
import org .elasticsearch .common .util .set .Sets ;
32
+ import org .elasticsearch .core .FixForMultiProject ;
30
33
import org .elasticsearch .core .IOUtils ;
31
34
import org .elasticsearch .core .TimeValue ;
32
35
import org .elasticsearch .node .Node ;
67
70
public class RemoteClusterServiceTests extends ESTestCase {
68
71
69
72
private final ThreadPool threadPool = new TestThreadPool (getClass ().getName ());
73
+ private final ProjectResolver projectResolver = DefaultProjectResolver .INSTANCE ;
70
74
private LinkedProjectConfigService linkedProjectConfigService = null ;
71
75
72
76
@ Override
@@ -84,12 +88,8 @@ private RemoteClusterService createRemoteClusterService(
84
88
ClusterSettings clusterSettings ,
85
89
MockTransportService transportService
86
90
) {
87
- linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService (
88
- settings ,
89
- clusterSettings ,
90
- DefaultProjectResolver .INSTANCE
91
- );
92
- return new RemoteClusterService (settings , transportService , DefaultProjectResolver .INSTANCE );
91
+ linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService (settings , clusterSettings , projectResolver );
92
+ return new RemoteClusterService (settings , transportService , projectResolver );
93
93
}
94
94
95
95
private MockTransportService startTransport (
@@ -776,19 +776,11 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
776
776
assertFalse (hasRegisteredClusters (service ));
777
777
778
778
final CountDownLatch firstLatch = new CountDownLatch (1 );
779
- service .updateRemoteCluster (
780
- "cluster_1" ,
781
- createSettings ("cluster_1" , Arrays .asList (c1N1Node .getAddress ().toString (), c1N2Node .getAddress ().toString ())),
782
- connectionListener (firstLatch )
783
- );
779
+ updateRemoteCluster (service , "cluster_1" , settings , List .of (c1N1Node , c1N2Node ), connectionListener (firstLatch ));
784
780
firstLatch .await ();
785
781
786
782
final CountDownLatch secondLatch = new CountDownLatch (1 );
787
- service .updateRemoteCluster (
788
- "cluster_2" ,
789
- createSettings ("cluster_2" , Arrays .asList (c2N1Node .getAddress ().toString (), c2N2Node .getAddress ().toString ())),
790
- connectionListener (secondLatch )
791
- );
783
+ updateRemoteCluster (service , "cluster_2" , settings , List .of (c2N1Node , c2N2Node ), connectionListener (secondLatch ));
792
784
secondLatch .await ();
793
785
794
786
assertTrue (hasRegisteredClusters (service ));
@@ -866,19 +858,11 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException {
866
858
assertFalse (hasRegisteredClusters (service ));
867
859
868
860
final CountDownLatch firstLatch = new CountDownLatch (1 );
869
- service .updateRemoteCluster (
870
- "cluster_1" ,
871
- createSettings ("cluster_1" , Arrays .asList (c1N1Node .getAddress ().toString (), c1N2Node .getAddress ().toString ())),
872
- connectionListener (firstLatch )
873
- );
861
+ updateRemoteCluster (service , "cluster_1" , settings , List .of (c1N1Node , c1N2Node ), connectionListener (firstLatch ));
874
862
firstLatch .await ();
875
863
876
864
final CountDownLatch secondLatch = new CountDownLatch (1 );
877
- service .updateRemoteCluster (
878
- "cluster_2" ,
879
- createSettings ("cluster_2" , Arrays .asList (c2N1Node .getAddress ().toString (), c2N2Node .getAddress ().toString ())),
880
- connectionListener (secondLatch )
881
- );
865
+ updateRemoteCluster (service , "cluster_2" , settings , List .of (c2N1Node , c2N2Node ), connectionListener (secondLatch ));
882
866
secondLatch .await ();
883
867
884
868
assertTrue (hasRegisteredClusters (service ));
@@ -961,20 +945,11 @@ public void testCollectNodes() throws InterruptedException, IOException {
961
945
assertFalse (hasRegisteredClusters (service ));
962
946
963
947
final CountDownLatch firstLatch = new CountDownLatch (1 );
964
-
965
- service .updateRemoteCluster (
966
- "cluster_1" ,
967
- createSettings ("cluster_1" , Arrays .asList (c1N1Node .getAddress ().toString (), c1N2Node .getAddress ().toString ())),
968
- connectionListener (firstLatch )
969
- );
948
+ updateRemoteCluster (service , "cluster_1" , settings , List .of (c1N1Node , c1N2Node ), connectionListener (firstLatch ));
970
949
firstLatch .await ();
971
950
972
951
final CountDownLatch secondLatch = new CountDownLatch (1 );
973
- service .updateRemoteCluster (
974
- "cluster_2" ,
975
- createSettings ("cluster_2" , Arrays .asList (c2N1Node .getAddress ().toString (), c2N2Node .getAddress ().toString ())),
976
- connectionListener (secondLatch )
977
- );
952
+ updateRemoteCluster (service , "cluster_2" , settings , List .of (c2N1Node , c2N2Node ), connectionListener (secondLatch ));
978
953
secondLatch .await ();
979
954
CountDownLatch latch = new CountDownLatch (1 );
980
955
service .collectNodes (
@@ -1108,8 +1083,9 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
1108
1083
final var seedList = List .of (c1N1Node .getAddress ().toString ());
1109
1084
transportService .start ();
1110
1085
transportService .acceptIncomingRequests ();
1086
+ final var initialSettings = createSettings ("cluster_1" , seedList );
1111
1087
1112
- try (RemoteClusterService service = createRemoteClusterService (createSettings ( "cluster_1" , seedList ) , transportService )) {
1088
+ try (RemoteClusterService service = createRemoteClusterService (initialSettings , transportService )) {
1113
1089
initializeRemoteClusters (service );
1114
1090
assertTrue (hasRegisteredClusters (service ));
1115
1091
final var numTasks = between (3 , 5 );
@@ -1122,7 +1098,7 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
1122
1098
while (taskLatch .getCount () != 0 ) {
1123
1099
final var future = new PlainActionFuture <RemoteClusterService .RemoteClusterConnectionStatus >();
1124
1100
final var settings = createSettings ("cluster_1" , isLinked ? Collections .emptyList () : seedList );
1125
- service . updateRemoteCluster ("cluster_1" , settings , future );
1101
+ updateRemoteCluster (service , "cluster_1" , initialSettings , settings , future );
1126
1102
safeGet (future );
1127
1103
isLinked = isLinked == false ;
1128
1104
}
@@ -1296,7 +1272,8 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
1296
1272
1297
1273
final Settings .Builder builder = Settings .builder ();
1298
1274
builder .putList ("cluster.remote.cluster_test.seeds" , Collections .singletonList (node0 .getAddress ().toString ()));
1299
- try (RemoteClusterService service = createRemoteClusterService (builder .build (), transportService )) {
1275
+ final var initialSettings = builder .build ();
1276
+ try (RemoteClusterService service = createRemoteClusterService (initialSettings , transportService )) {
1300
1277
assertFalse (hasRegisteredClusters (service ));
1301
1278
initializeRemoteClusters (service );
1302
1279
assertTrue (hasRegisteredClusters (service ));
@@ -1308,11 +1285,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
1308
1285
assertFalse (firstRemoteClusterConnection .isClosed ());
1309
1286
1310
1287
final CountDownLatch firstLatch = new CountDownLatch (1 );
1311
- service .updateRemoteCluster (
1312
- "cluster_test" ,
1313
- createSettings ("cluster_test" , Collections .singletonList (node0 .getAddress ().toString ())),
1314
- connectionListener (firstLatch )
1315
- );
1288
+ updateRemoteCluster (service , "cluster_test" , initialSettings , List .of (node0 ), connectionListener (firstLatch ));
1316
1289
firstLatch .await ();
1317
1290
1318
1291
assertTrue (hasRegisteredClusters (service ));
@@ -1322,15 +1295,15 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
1322
1295
assertFalse (firstRemoteClusterConnection .isClosed ());
1323
1296
assertSame (firstRemoteClusterConnection , service .getRemoteClusterConnection ("cluster_test" ));
1324
1297
1325
- final List <String > newSeeds = new ArrayList <>();
1326
- newSeeds .add (node1 . getAddress (). toString () );
1298
+ final List <DiscoveryNode > newSeeds = new ArrayList <>();
1299
+ newSeeds .add (node1 );
1327
1300
if (randomBoolean ()) {
1328
- newSeeds .add (node0 . getAddress (). toString () );
1301
+ newSeeds .add (node0 );
1329
1302
Collections .shuffle (newSeeds , random ());
1330
1303
}
1331
1304
1332
1305
final CountDownLatch secondLatch = new CountDownLatch (1 );
1333
- service . updateRemoteCluster ("cluster_test" , createSettings ( "cluster_test" , newSeeds ) , connectionListener (secondLatch ));
1306
+ updateRemoteCluster (service , "cluster_test" , initialSettings , newSeeds , connectionListener (secondLatch ));
1334
1307
secondLatch .await ();
1335
1308
1336
1309
assertTrue (hasRegisteredClusters (service ));
@@ -1575,7 +1548,8 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster
1575
1548
} else {
1576
1549
firstRemoteClusterSettingsBuilder .put ("cluster.remote.cluster_1.seeds" , c1Node .getAddress ().toString ());
1577
1550
}
1578
- service .updateRemoteCluster ("cluster_1" , firstRemoteClusterSettingsBuilder .build (), connectionListener (firstLatch ));
1551
+ final var updatedSettings1 = firstRemoteClusterSettingsBuilder .build ();
1552
+ updateRemoteCluster (service , "cluster_1" , settings , updatedSettings1 , connectionListener (firstLatch ));
1579
1553
firstLatch .await ();
1580
1554
1581
1555
final CountDownLatch secondLatch = new CountDownLatch (1 );
@@ -1587,7 +1561,8 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster
1587
1561
} else {
1588
1562
secondRemoteClusterSettingsBuilder .put ("cluster.remote.cluster_2.seeds" , c2Node .getAddress ().toString ());
1589
1563
}
1590
- service .updateRemoteCluster ("cluster_2" , secondRemoteClusterSettingsBuilder .build (), connectionListener (secondLatch ));
1564
+ final var updatedSettings2 = secondRemoteClusterSettingsBuilder .build ();
1565
+ updateRemoteCluster (service , "cluster_2" , settings , updatedSettings2 , connectionListener (secondLatch ));
1591
1566
secondLatch .await ();
1592
1567
1593
1568
assertTrue (hasRegisteredClusters (service ));
@@ -1642,7 +1617,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
1642
1617
1643
1618
final Settings clusterSettings = buildRemoteClusterSettings ("cluster_1" , discoNode .getAddress ().toString ());
1644
1619
final CountDownLatch latch = new CountDownLatch (1 );
1645
- service . updateRemoteCluster ("cluster_1" , clusterSettings , connectionListener (latch ));
1620
+ updateRemoteCluster (service , "cluster_1" , Settings . EMPTY , clusterSettings , connectionListener (latch ));
1646
1621
latch .await ();
1647
1622
1648
1623
assertConnectionHasProfile (service .getRemoteClusterConnection ("cluster_1" ), "default" );
@@ -1731,12 +1706,12 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
1731
1706
1732
1707
final Settings cluster1Settings = buildRemoteClusterSettings (goodCluster , c1DiscoNode .getAddress ().toString ());
1733
1708
final var latch = new CountDownLatch (1 );
1734
- service . updateRemoteCluster (goodCluster , cluster1Settings , connectionListener (latch ));
1709
+ updateRemoteCluster (service , goodCluster , Settings . EMPTY , cluster1Settings , connectionListener (latch ));
1735
1710
latch .await ();
1736
1711
1737
1712
final Settings cluster2Settings = buildRemoteClusterSettings (badCluster , c2DiscoNode .getAddress ().toString ());
1738
1713
final PlainActionFuture <RemoteClusterService .RemoteClusterConnectionStatus > future = new PlainActionFuture <>();
1739
- service . updateRemoteCluster (badCluster , cluster2Settings , future );
1714
+ updateRemoteCluster (service , badCluster , Settings . EMPTY , cluster2Settings , future );
1740
1715
final var ex = expectThrows (Exception .class , () -> future .actionGet (10 , TimeUnit .SECONDS ));
1741
1716
assertThat (ex .getMessage (), containsString ("bad cluster" ));
1742
1717
@@ -1853,6 +1828,30 @@ public void testLogsConnectionResult() throws IOException {
1853
1828
}
1854
1829
}
1855
1830
1831
+ private void updateRemoteCluster (
1832
+ RemoteClusterService service ,
1833
+ String alias ,
1834
+ Settings settings ,
1835
+ List <DiscoveryNode > seedNodes ,
1836
+ ActionListener <RemoteClusterService .RemoteClusterConnectionStatus > listener
1837
+ ) {
1838
+ final var newSettings = createSettings (alias , seedNodes .stream ().map (n -> n .getAddress ().toString ()).toList ());
1839
+ updateRemoteCluster (service , alias , settings , newSettings , listener );
1840
+ }
1841
+
1842
+ private void updateRemoteCluster (
1843
+ RemoteClusterService service ,
1844
+ String alias ,
1845
+ Settings settings ,
1846
+ Settings newSettings ,
1847
+ ActionListener <RemoteClusterService .RemoteClusterConnectionStatus > listener
1848
+ ) {
1849
+ final var mergedSettings = Settings .builder ().put (settings , false ).put (newSettings , false ).build ();
1850
+ @ FixForMultiProject (description = "Refactor to add the linked project ID associated with the alias." )
1851
+ final var config = RemoteClusterSettings .toConfig (projectResolver .getProjectId (), ProjectId .DEFAULT , alias , mergedSettings );
1852
+ service .updateRemoteCluster (config , false , listener );
1853
+ }
1854
+
1856
1855
private void initializeRemoteClusters (RemoteClusterService remoteClusterService ) {
1857
1856
remoteClusterService .initializeRemoteClusters (linkedProjectConfigService .getInitialLinkedProjectConfigs ());
1858
1857
}
0 commit comments