6
6
*/
7
7
package org .elasticsearch .xpack .ccr ;
8
8
9
+ import org .apache .logging .log4j .Logger ;
9
10
import org .elasticsearch .client .Request ;
11
+ import org .elasticsearch .client .RequestOptions ;
10
12
import org .elasticsearch .client .Response ;
11
13
import org .elasticsearch .client .ResponseException ;
12
14
import org .elasticsearch .client .RestClient ;
15
+ import org .elasticsearch .client .WarningsHandler ;
13
16
import org .elasticsearch .common .settings .SecureString ;
14
17
import org .elasticsearch .common .settings .Settings ;
15
18
import org .elasticsearch .common .util .concurrent .ThreadContext ;
16
19
import org .elasticsearch .common .xcontent .support .XContentMapValues ;
20
+ import org .elasticsearch .core .CheckedRunnable ;
17
21
import org .elasticsearch .index .seqno .ReplicationTracker ;
18
22
import org .elasticsearch .test .rest .yaml .ObjectPath ;
19
23
@@ -69,7 +73,9 @@ public void testFollowIndex() throws Exception {
69
73
followIndex ("leader_cluster" , allowedIndex , allowedIndex );
70
74
assertBusy (() -> verifyDocuments (allowedIndex , numDocs , "*:*" ));
71
75
assertThat (getCcrNodeTasks (), contains (new CcrNodeTask ("leader_cluster" , allowedIndex , allowedIndex , 0 )));
72
- assertBusy (() -> verifyCcrMonitoring (allowedIndex , allowedIndex ), 120L , TimeUnit .SECONDS );
76
+
77
+ withMonitoring (logger , () -> { assertBusy (() -> verifyCcrMonitoring (allowedIndex , allowedIndex ), 120L , TimeUnit .SECONDS ); });
78
+
73
79
pauseFollow (allowedIndex );
74
80
// Make sure that there are no other ccr relates operations running:
75
81
assertBusy (() -> {
@@ -143,18 +149,20 @@ public void testFollowIndex() throws Exception {
143
149
public void testAutoFollowPatterns () throws Exception {
144
150
assumeTrue ("Test should only run with target_cluster=follow" , "follow" .equals (targetCluster ));
145
151
146
- String allowedIndex = "logs-eu_20190101" ;
147
- String disallowedIndex = "logs-us_20190101" ;
152
+ final String prefix = getTestName ().toLowerCase (Locale .ROOT );
153
+ String allowedIndex = prefix + "-eu_20190101" ;
154
+ String disallowedIndex = prefix + "-us_20190101" ;
148
155
156
+ final String pattern = "pattern_" + prefix ;
149
157
{
150
- Request request = new Request ("PUT" , "/_ccr/auto_follow/test_pattern" );
151
- request .setJsonEntity ("{\" leader_index_patterns\" : [\" logs -*\" ], \" remote_cluster\" : \" leader_cluster\" }" );
158
+ Request request = new Request ("PUT" , "/_ccr/auto_follow/" + pattern );
159
+ request .setJsonEntity ("{\" leader_index_patterns\" : [\" testautofollowpatterns -*\" ], \" remote_cluster\" : \" leader_cluster\" }" );
152
160
Exception e = expectThrows (ResponseException .class , () -> assertOK (client ().performRequest (request )));
153
- assertThat (e .getMessage (), containsString ("insufficient privileges to follow index [logs -*]" ));
161
+ assertThat (e .getMessage (), containsString ("insufficient privileges to follow index [testautofollowpatterns -*]" ));
154
162
}
155
163
156
- Request request = new Request ("PUT" , "/_ccr/auto_follow/test_pattern" );
157
- request .setJsonEntity ("{\" leader_index_patterns\" : [\" logs -eu*\" ], \" remote_cluster\" : \" leader_cluster\" }" );
164
+ Request request = new Request ("PUT" , "/_ccr/auto_follow/" + pattern );
165
+ request .setJsonEntity ("{\" leader_index_patterns\" : [\" testautofollowpatterns -eu*\" ], \" remote_cluster\" : \" leader_cluster\" }" );
158
166
assertOK (client ().performRequest (request ));
159
167
160
168
try (RestClient leaderClient = buildLeaderClient ()) {
@@ -175,12 +183,14 @@ public void testAutoFollowPatterns() throws Exception {
175
183
assertBusy (() -> ensureYellow (allowedIndex ), 30 , TimeUnit .SECONDS );
176
184
assertBusy (() -> verifyDocuments (allowedIndex , 5 , "*:*" ), 30 , TimeUnit .SECONDS );
177
185
assertThat (indexExists (disallowedIndex ), is (false ));
178
- assertBusy (() -> verifyCcrMonitoring (allowedIndex , allowedIndex ), 120L , TimeUnit .SECONDS );
179
- assertBusy (ESCCRRestTestCase ::verifyAutoFollowMonitoring , 120L , TimeUnit .SECONDS );
186
+ withMonitoring (logger , () -> {
187
+ assertBusy (() -> verifyCcrMonitoring (allowedIndex , allowedIndex ), 120L , TimeUnit .SECONDS );
188
+ assertBusy (ESCCRRestTestCase ::verifyAutoFollowMonitoring , 120L , TimeUnit .SECONDS );
189
+ });
180
190
} finally {
181
191
// Cleanup by deleting auto follow pattern and pause following:
182
192
try {
183
- deleteAutoFollowPattern ("test_pattern" );
193
+ deleteAutoFollowPattern (pattern );
184
194
pauseFollow (allowedIndex );
185
195
} catch (Throwable e ) {
186
196
logger .warn ("Failed to cleanup after the test" , e );
@@ -306,6 +316,30 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
306
316
}
307
317
}
308
318
319
+ private static void withMonitoring (Logger logger , CheckedRunnable <Exception > runnable ) throws Exception {
320
+ Request enableMonitoring = new Request ("PUT" , "/_cluster/settings" );
321
+ enableMonitoring .setOptions (RequestOptions .DEFAULT .toBuilder ().setWarningsHandler (WarningsHandler .PERMISSIVE ).build ());
322
+ enableMonitoring .setJsonEntity (
323
+ "{\" persistent\" :{" + "\" xpack.monitoring.collection.enabled\" :true," + "\" xpack.monitoring.collection.interval\" :\" 1s\" " + "}}"
324
+ );
325
+ assertOK (adminClient ().performRequest (enableMonitoring ));
326
+ logger .info ("monitoring collection enabled" );
327
+ try {
328
+ runnable .run ();
329
+ } finally {
330
+ Request disableMonitoring = new Request ("PUT" , "/_cluster/settings" );
331
+ disableMonitoring .setOptions (RequestOptions .DEFAULT .toBuilder ().setWarningsHandler (WarningsHandler .PERMISSIVE ).build ());
332
+ disableMonitoring .setJsonEntity (
333
+ "{\" persistent\" :{"
334
+ + "\" xpack.monitoring.collection.enabled\" :null,"
335
+ + "\" xpack.monitoring.collection.interval\" :null"
336
+ + "}}"
337
+ );
338
+ assertOK (adminClient ().performRequest (disableMonitoring ));
339
+ logger .info ("monitoring collection disabled" );
340
+ }
341
+ }
342
+
309
343
private static void assertNoPersistentTasks () throws IOException {
310
344
Map <String , Object > clusterState = toMap (adminClient ().performRequest (new Request ("GET" , "/_cluster/state" )));
311
345
List <?> tasks = (List <?>) XContentMapValues .extractValue ("metadata.persistent_tasks.tasks" , clusterState );
0 commit comments