|
17 | 17 | import org.elasticsearch.client.RestClient;
|
18 | 18 | import org.elasticsearch.common.Strings;
|
19 | 19 | import org.elasticsearch.common.settings.Settings;
|
| 20 | +import org.elasticsearch.common.time.DateFormatter; |
| 21 | +import org.elasticsearch.common.time.FormatNames; |
20 | 22 | import org.elasticsearch.common.xcontent.XContentHelper;
|
21 | 23 | import org.elasticsearch.core.TimeValue;
|
22 | 24 | import org.elasticsearch.rest.RestStatus;
|
|
28 | 30 | import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
29 | 31 | import org.elasticsearch.xpack.core.ilm.Phase;
|
30 | 32 | import org.elasticsearch.xpack.core.ilm.UnfollowAction;
|
| 33 | +import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep; |
31 | 34 |
|
32 | 35 | import java.io.IOException;
|
33 | 36 | import java.io.InputStream;
|
| 37 | +import java.time.Instant; |
34 | 38 | import java.util.List;
|
35 | 39 | import java.util.Locale;
|
36 | 40 | import java.util.Map;
|
|
39 | 43 |
|
40 | 44 | import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
41 | 45 | import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
|
| 46 | +import static org.hamcrest.Matchers.containsString; |
42 | 47 | import static org.hamcrest.Matchers.equalTo;
|
| 48 | +import static org.hamcrest.Matchers.greaterThan; |
43 | 49 | import static org.hamcrest.Matchers.is;
|
44 | 50 | import static org.hamcrest.Matchers.notNullValue;
|
45 | 51 | import static org.hamcrest.Matchers.nullValue;
|
46 | 52 |
|
47 | 53 | public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
48 | 54 |
|
49 | 55 | private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class);
|
| 56 | + private static final String TSDB_INDEX_TEMPLATE = """ |
| 57 | + { |
| 58 | + "index_patterns": ["%s*"], |
| 59 | + "data_stream": {}, |
| 60 | + "template": { |
| 61 | + "settings":{ |
| 62 | + "index": { |
| 63 | + "number_of_replicas": 0, |
| 64 | + "number_of_shards": 1, |
| 65 | + "routing_path": ["metricset"], |
| 66 | + "mode": "time_series" |
| 67 | + }, |
| 68 | + "index.lifecycle.name": "%s" |
| 69 | + }, |
| 70 | + "mappings":{ |
| 71 | + "properties": { |
| 72 | + "@timestamp" : { |
| 73 | + "type": "date" |
| 74 | + }, |
| 75 | + "metricset": { |
| 76 | + "type": "keyword", |
| 77 | + "time_series_dimension": true |
| 78 | + }, |
| 79 | + "volume": { |
| 80 | + "type": "double", |
| 81 | + "time_series_metric": "gauge" |
| 82 | + } |
| 83 | + } |
| 84 | + } |
| 85 | + } |
| 86 | + }"""; |
50 | 87 |
|
51 | 88 | public void testBasicCCRAndILMIntegration() throws Exception {
|
52 | 89 | String indexName = "logs-1";
|
@@ -533,6 +570,91 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception {
|
533 | 570 | }
|
534 | 571 | }
|
535 | 572 |
|
| 573 | + @SuppressWarnings("unchecked") |
| 574 | + public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception { |
| 575 | + String indexPattern = "tsdb-index-"; |
| 576 | + String dataStream = "tsdb-index-cpu"; |
| 577 | + String policyName = "tsdb-policy"; |
| 578 | + |
| 579 | + if ("leader".equals(targetCluster)) { |
| 580 | + putILMPolicy(policyName, null, 1, null); |
| 581 | + Request templateRequest = new Request("PUT", "/_index_template/tsdb_template"); |
| 582 | + templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName)); |
| 583 | + assertOK(client().performRequest(templateRequest)); |
| 584 | + } else if ("follow".equals(targetCluster)) { |
| 585 | + // Use unfollow-only policy for follower cluster instead of regular ILM policy |
| 586 | + // Follower clusters should not have their own rollover actions as they are meant |
| 587 | + // to follow the rollover behavior of the leader index, not initiate their own rollovers |
| 588 | + putUnfollowOnlyPolicy(client(), policyName); |
| 589 | + |
| 590 | + Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern"); |
| 591 | + createAutoFollowRequest.setJsonEntity(""" |
| 592 | + { |
| 593 | + "leader_index_patterns": [ "tsdb-index-*" ], |
| 594 | + "remote_cluster": "leader_cluster", |
| 595 | + "read_poll_timeout": "1000ms", |
| 596 | + "follow_index_pattern": "{{leader_index}}" |
| 597 | + }"""); |
| 598 | + assertOK(client().performRequest(createAutoFollowRequest)); |
| 599 | + |
| 600 | + try (RestClient leaderClient = buildLeaderClient()) { |
| 601 | + String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now()); |
| 602 | + |
| 603 | + // Index a document on the leader index, this should trigger an ILM rollover. |
| 604 | + // This will ensure that 'index.lifecycle.indexing_complete' is set. |
| 605 | + index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); |
| 606 | + |
| 607 | + String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0); |
| 608 | + assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName)))); |
| 609 | + |
| 610 | + assertBusy(() -> { |
| 611 | + Map<String, Object> indexExplanation = explainIndex(client(), backingIndexName); |
| 612 | + assertThat( |
| 613 | + "index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses", |
| 614 | + indexExplanation.get("step"), |
| 615 | + is(WaitUntilTimeSeriesEndTimePassesStep.NAME) |
| 616 | + ); |
| 617 | + |
| 618 | + assertThat(indexExplanation.get("step_info"), is(notNullValue())); |
| 619 | + assertThat( |
| 620 | + (String) ((Map<String, Object>) indexExplanation.get("step_info")).get("message"), |
| 621 | + containsString("Waiting until the index's time series end time lapses") |
| 622 | + ); |
| 623 | + }, 30, TimeUnit.SECONDS); |
| 624 | + |
| 625 | + int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName); |
| 626 | + |
| 627 | + // Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep |
| 628 | + String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()) |
| 629 | + .format(Instant.now().plusSeconds(30)); |
| 630 | + |
| 631 | + for (int i = 0; i < 5; i++) { |
| 632 | + index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i); |
| 633 | + } |
| 634 | + |
| 635 | + // Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep |
| 636 | + assertBusy(() -> { |
| 637 | + int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName); |
| 638 | + int currentFollowerDocCount = getDocCount(client(), backingIndexName); |
| 639 | + |
| 640 | + assertThat( |
| 641 | + "Leader should have more documents than initially", |
| 642 | + currentLeaderDocCount, |
| 643 | + greaterThan(initialLeaderDocCount) |
| 644 | + ); |
| 645 | + assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount)); |
| 646 | + |
| 647 | + // Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep |
| 648 | + assertThat( |
| 649 | + "Index should still be in WaitUntilTimeSeriesEndTimePassesStep", |
| 650 | + explainIndex(client(), backingIndexName).get("step"), |
| 651 | + is(WaitUntilTimeSeriesEndTimePassesStep.NAME) |
| 652 | + ); |
| 653 | + }, 30, TimeUnit.SECONDS); |
| 654 | + } |
| 655 | + } |
| 656 | + } |
| 657 | + |
536 | 658 | private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException {
|
537 | 659 | logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
|
538 | 660 | Request request = new Request("PUT", "/_cluster/settings");
|
@@ -839,4 +961,24 @@ private static String getShrinkIndexName(RestClient client, String originalIndex
|
839 | 961 | : "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]";
|
840 | 962 | return shrunkenIndexName[0];
|
841 | 963 | }
|
| 964 | + |
| 965 | + private static Map<String, Object> explainIndex(RestClient client, String indexName) throws IOException { |
| 966 | + Request explainRequest = new Request("GET", indexName + "/_ilm/explain"); |
| 967 | + Response response = client.performRequest(explainRequest); |
| 968 | + Map<String, Object> responseMap; |
| 969 | + try (InputStream is = response.getEntity().getContent()) { |
| 970 | + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); |
| 971 | + } |
| 972 | + |
| 973 | + @SuppressWarnings("unchecked") |
| 974 | + Map<String, Map<String, Object>> indexResponse = ((Map<String, Map<String, Object>>) responseMap.get("indices")); |
| 975 | + return indexResponse.get(indexName); |
| 976 | + } |
| 977 | + |
| 978 | + private static int getDocCount(RestClient client, String indexName) throws IOException { |
| 979 | + Request countRequest = new Request("GET", "/" + indexName + "/_count"); |
| 980 | + Response response = client.performRequest(countRequest); |
| 981 | + Map<String, Object> result = entityAsMap(response); |
| 982 | + return (int) result.get("count"); |
| 983 | + } |
842 | 984 | }
|
0 commit comments