Skip to content

Commit 99d6ecf

Browse files
committed
test: Add integration test for TSDB rollover and CCR sync during ILM wait step
1 parent 4634388 commit 99d6ecf

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed

x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.RequestOptions;
1516
import org.elasticsearch.client.Response;
1617
import org.elasticsearch.client.ResponseException;
1718
import org.elasticsearch.client.RestClient;
1819
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.time.DateFormatter;
22+
import org.elasticsearch.common.time.FormatNames;
2023
import org.elasticsearch.common.xcontent.XContentHelper;
2124
import org.elasticsearch.core.TimeValue;
2225
import org.elasticsearch.rest.RestStatus;
@@ -28,9 +31,11 @@
2831
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
2932
import org.elasticsearch.xpack.core.ilm.Phase;
3033
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
34+
import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep;
3135

3236
import java.io.IOException;
3337
import java.io.InputStream;
38+
import java.time.Instant;
3439
import java.util.List;
3540
import java.util.Locale;
3641
import java.util.Map;
@@ -39,14 +44,47 @@
3944

4045
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
4146
import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
47+
import static org.hamcrest.Matchers.containsString;
4248
import static org.hamcrest.Matchers.equalTo;
49+
import static org.hamcrest.Matchers.greaterThan;
4350
import static org.hamcrest.Matchers.is;
4451
import static org.hamcrest.Matchers.notNullValue;
4552
import static org.hamcrest.Matchers.nullValue;
4653

4754
public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
4855

4956
private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class);
57+
private static final String TSDB_INDEX_TEMPLATE = """
58+
{
59+
"index_patterns": ["%s*"],
60+
"data_stream": {},
61+
"template": {
62+
"settings":{
63+
"index": {
64+
"number_of_replicas": 0,
65+
"number_of_shards": 1,
66+
"routing_path": ["metricset"],
67+
"mode": "time_series"
68+
},
69+
"index.lifecycle.name": "%s"
70+
},
71+
"mappings":{
72+
"properties": {
73+
"@timestamp" : {
74+
"type": "date"
75+
},
76+
"metricset": {
77+
"type": "keyword",
78+
"time_series_dimension": true
79+
},
80+
"volume": {
81+
"type": "double",
82+
"time_series_metric": "gauge"
83+
}
84+
}
85+
}
86+
}
87+
}""";
5088

5189
public void testBasicCCRAndILMIntegration() throws Exception {
5290
String indexName = "logs-1";
@@ -533,6 +571,94 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception {
533571
}
534572
}
535573

574+
@SuppressWarnings({ "checkstyle:LineLength", "unchecked" })
575+
public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception {
576+
String indexPattern = "tsdb-index-";
577+
String dataStream = "tsdb-index-cpu";
578+
String policyName = "tsdb-policy";
579+
580+
if ("leader".equals(targetCluster)) {
581+
putILMPolicy(policyName, null, 1, null);
582+
Request templateRequest = new Request("PUT", "/_index_template/tsdb_template");
583+
templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName));
584+
assertOK(client().performRequest(templateRequest));
585+
} else if ("follow".equals(targetCluster)) {
586+
putILMPolicy(policyName, null, 1, null);
587+
588+
Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern");
589+
createAutoFollowRequest.setJsonEntity("""
590+
{
591+
"leader_index_patterns": [ ".ds-tsdb-index-*" ],
592+
"remote_cluster": "leader_cluster",
593+
"read_poll_timeout": "1000ms",
594+
"follow_index_pattern": "{{leader_index}}"
595+
}""");
596+
assertOK(client().performRequest(createAutoFollowRequest));
597+
598+
try (RestClient leaderClient = buildLeaderClient()) {
599+
String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now());
600+
index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5));
601+
602+
String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0);
603+
assertBusy(() -> { assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName))); });
604+
605+
// rollover
606+
Request rolloverRequest = new Request("POST", "/" + dataStream + "/_rollover");
607+
rolloverRequest.setJsonEntity("""
608+
{
609+
"conditions": {
610+
"max_docs": "1"
611+
}
612+
}""");
613+
leaderClient.performRequest(rolloverRequest);
614+
615+
assertBusy(() -> {
616+
assertThat(
617+
"index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses",
618+
explainIndex(client(), backingIndexName).get("step"),
619+
is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
620+
);
621+
622+
assertThat(explainIndex(client(), backingIndexName).get("step_info"), is(notNullValue()));
623+
assertThat(
624+
(String) ((Map<String, Object>) explainIndex(client(), backingIndexName).get("step_info")).get("message"),
625+
containsString("Waiting until the index's time series end time lapses")
626+
);
627+
}, 30, TimeUnit.SECONDS);
628+
629+
int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName);
630+
631+
// Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep
632+
String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName())
633+
.format(Instant.now().plusSeconds(30));
634+
635+
for (int i = 0; i < 5; i++) {
636+
index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i);
637+
}
638+
639+
// Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep
640+
assertBusy(() -> {
641+
int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName);
642+
int currentFollowerDocCount = getDocCount(client(), backingIndexName);
643+
644+
assertThat(
645+
"Leader should have more documents than initially",
646+
currentLeaderDocCount,
647+
greaterThan(initialLeaderDocCount)
648+
);
649+
assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount));
650+
651+
// Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep
652+
assertThat(
653+
"Index should still be in WaitUntilTimeSeriesEndTimePassesStep",
654+
explainIndex(client(), backingIndexName).get("step"),
655+
is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
656+
);
657+
}, 30, TimeUnit.SECONDS);
658+
}
659+
}
660+
}
661+
536662
private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException {
537663
logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
538664
Request request = new Request("PUT", "/_cluster/settings");
@@ -839,4 +965,33 @@ private static String getShrinkIndexName(RestClient client, String originalIndex
839965
: "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]";
840966
return shrunkenIndexName[0];
841967
}
968+
969+
private static Map<String, Object> explainIndex(RestClient client, String indexName) throws IOException {
970+
RequestOptions consumeWarningsOptions = RequestOptions.DEFAULT.toBuilder()
971+
.setWarningsHandler(warnings -> warnings.isEmpty() == false && List.of("""
972+
[indices.lifecycle.rollover.only_if_has_documents] setting was deprecated in Elasticsearch \
973+
and will be removed in a future release. \
974+
See the deprecation documentation for the next major version.""").equals(warnings) == false)
975+
.build();
976+
977+
Request explainRequest = new Request("GET", indexName + "/_ilm/explain");
978+
explainRequest.setOptions(consumeWarningsOptions);
979+
Response response = client.performRequest(explainRequest);
980+
Map<String, Object> responseMap;
981+
try (InputStream is = response.getEntity().getContent()) {
982+
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
983+
}
984+
985+
@SuppressWarnings("unchecked")
986+
Map<String, Map<String, Object>> indexResponse = ((Map<String, Map<String, Object>>) responseMap.get("indices"));
987+
return indexResponse.get(indexName);
988+
}
989+
990+
private static int getDocCount(RestClient client, String indexName) throws IOException {
991+
Request countRequest = new Request("GET", "/" + indexName + "/_count");
992+
Response response = client.performRequest(countRequest);
993+
Map<String, Object> result = entityAsMap(response);
994+
System.out.println("result = " + result);
995+
return (int) result.get("count");
996+
}
842997
}

0 commit comments

Comments
 (0)