8
8
package org .elasticsearch .search .ccs ;
9
9
10
10
import org .elasticsearch .action .DocWriteResponse ;
11
+ import org .elasticsearch .action .admin .cluster .remote .RemoteInfoRequest ;
12
+ import org .elasticsearch .action .admin .cluster .remote .RemoteInfoResponse ;
13
+ import org .elasticsearch .action .admin .cluster .remote .TransportRemoteInfoAction ;
11
14
import org .elasticsearch .action .search .OpenPointInTimeRequest ;
12
15
import org .elasticsearch .action .search .OpenPointInTimeResponse ;
13
16
import org .elasticsearch .action .search .SearchRequest ;
35
38
import org .elasticsearch .search .SearchHit ;
36
39
import org .elasticsearch .search .builder .SearchSourceBuilder ;
37
40
import org .elasticsearch .test .AbstractMultiClustersTestCase ;
41
+ import org .elasticsearch .transport .RemoteConnectionInfo ;
38
42
import org .elasticsearch .xcontent .XContentBuilder ;
39
43
import org .elasticsearch .xcontent .XContentFactory ;
40
44
import org .elasticsearch .xcontent .XContentType ;
56
60
import java .util .List ;
57
61
import java .util .Map ;
58
62
import java .util .Set ;
63
+ import java .util .concurrent .TimeUnit ;
59
64
import java .util .function .Consumer ;
60
65
import java .util .stream .Collectors ;
61
66
@@ -92,14 +97,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
92
97
return List .of (LocalStateInferencePlugin .class , TestInferenceServicePlugin .class , FakeMlPlugin .class );
93
98
}
94
99
95
- protected void setupTwoClusters (TestIndexInfo localIndexInfo , TestIndexInfo remoteIndexInfo ) throws IOException {
100
+ protected void setupTwoClusters (TestIndexInfo localIndexInfo , TestIndexInfo remoteIndexInfo ) throws Exception {
96
101
setupCluster (LOCAL_CLUSTER , localIndexInfo );
97
102
setupCluster (REMOTE_CLUSTER , remoteIndexInfo );
103
+ waitUntilRemoteClusterConnected (REMOTE_CLUSTER );
98
104
}
99
105
100
106
protected void setupCluster (String clusterAlias , TestIndexInfo indexInfo ) throws IOException {
101
107
final Client client = client (clusterAlias );
102
108
final String indexName = indexInfo .name ();
109
+ final int dataNodeCount = cluster (clusterAlias ).numDataNodes ();
103
110
104
111
for (var entry : indexInfo .inferenceEndpoints ().entrySet ()) {
105
112
String inferenceId = entry .getKey ();
@@ -117,13 +124,13 @@ protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws
117
124
createInferenceEndpoint (client , minimalServiceSettings .taskType (), inferenceId , serviceSettings );
118
125
}
119
126
120
- Settings indexSettings = indexSettings (randomIntBetween (2 , 5 ), randomIntBetween ( 0 , 1 ) ).build ();
127
+ Settings indexSettings = indexSettings (randomIntBetween (1 , dataNodeCount ), 0 ).build ();
121
128
assertAcked (client .admin ().indices ().prepareCreate (indexName ).setSettings (indexSettings ).setMapping (indexInfo .mappings ()));
122
129
assertFalse (
123
130
client .admin ()
124
131
.cluster ()
125
132
.prepareHealth (TEST_REQUEST_TIMEOUT , indexName )
126
- .setWaitForYellowStatus ()
133
+ .setWaitForGreenStatus ()
127
134
.setTimeout (TimeValue .timeValueSeconds (10 ))
128
135
.get ()
129
136
.isTimedOut ()
@@ -140,6 +147,18 @@ protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws
140
147
assertThat (refreshResponse .getStatus (), is (RestStatus .OK ));
141
148
}
142
149
150
+ protected void waitUntilRemoteClusterConnected (String clusterAlias ) throws Exception {
151
+ RemoteInfoRequest request = new RemoteInfoRequest ();
152
+ assertBusy (() -> {
153
+ RemoteInfoResponse response = client ().execute (TransportRemoteInfoAction .TYPE , request ).actionGet (TEST_REQUEST_TIMEOUT );
154
+ boolean connected = response .getInfos ()
155
+ .stream ()
156
+ .filter (i -> i .getClusterAlias ().equals (clusterAlias ))
157
+ .anyMatch (RemoteConnectionInfo ::isConnected );
158
+ assertThat (connected , is (true ));
159
+ }, 30 , TimeUnit .SECONDS );
160
+ }
161
+
143
162
protected BytesReference openPointInTime (String [] indices , TimeValue keepAlive ) {
144
163
OpenPointInTimeRequest request = new OpenPointInTimeRequest (indices ).keepAlive (keepAlive );
145
164
final OpenPointInTimeResponse response = client ().execute (TransportOpenPointInTimeAction .TYPE , request ).actionGet ();
0 commit comments