Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add new integration test in the CCRIndexLifecycleIT test suite to cover this scenario. You can refer to this test as an example for verifying the WaitUntilTimeSeriesEndTimePassesStep. Essentially we would want to verify that after leader index rollovers, the follower index goes into the WaitUntilTimeSeriesEndTimePassesStep, and new documents to the leader index are synced to the follower index until end_time has passed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the code review.
I'll review the existing test cases and work on adding new ones.I'm going to add test code!

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -44,6 +45,7 @@ private UnfollowAction() {}
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey preUnfollowKey = new StepKey(phase, NAME, CONDITIONAL_UNFOLLOW_STEP);
StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME);
StepKey waitUntilTimeSeriesEndTimePassesStep = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME);
StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME);
StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME);
StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME);
Expand All @@ -59,14 +61,21 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
// if the index has no CCR metadata we'll skip the unfollow action completely
return customIndexMetadata == null;
});
WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks);
WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
UnfollowFollowerIndexStep step5 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
OpenIndexStep step6 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client);
WaitForIndexColorStep step7 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW);
return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7);
WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitUntilTimeSeriesEndTimePassesStep);

WaitUntilTimeSeriesEndTimePassesStep step2 = new WaitUntilTimeSeriesEndTimePassesStep(
waitUntilTimeSeriesEndTimePassesStep,
waitForFollowShardTasks,
Instant::now
);

WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
UnfollowFollowerIndexStep step6 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
OpenIndexStep step7 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client);
WaitForIndexColorStep step8 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW);
return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7, step8);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ public void testToSteps() {
);
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertThat(steps, notNullValue());
assertThat(steps.size(), equalTo(8));
assertThat(steps.size(), equalTo(9));

StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowAction.CONDITIONAL_UNFOLLOW_STEP);
StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME);
StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME);
StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME);
StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME);
StepKey expectedNinthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME);

BranchingStep firstStep = (BranchingStep) steps.get(0);
assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey));
Expand All @@ -69,30 +70,34 @@ public void testToSteps() {
assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey));
assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey));

WaitForFollowShardTasksStep thirdStep = (WaitForFollowShardTasksStep) steps.get(2);
WaitUntilTimeSeriesEndTimePassesStep thirdStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(2);
assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey));
assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey));

PauseFollowerIndexStep fourthStep = (PauseFollowerIndexStep) steps.get(3);
WaitForFollowShardTasksStep fourthStep = (WaitForFollowShardTasksStep) steps.get(3);
assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey));
assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey));

CloseFollowerIndexStep fifthStep = (CloseFollowerIndexStep) steps.get(4);
PauseFollowerIndexStep fifthStep = (PauseFollowerIndexStep) steps.get(4);
assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey));
assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey));

UnfollowFollowerIndexStep sixthStep = (UnfollowFollowerIndexStep) steps.get(5);
CloseFollowerIndexStep sixthStep = (CloseFollowerIndexStep) steps.get(5);
assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey));
assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey));

OpenIndexStep seventhStep = (OpenIndexStep) steps.get(6);
UnfollowFollowerIndexStep seventhStep = (UnfollowFollowerIndexStep) steps.get(6);
assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey));
assertThat(seventhStep.getNextStepKey(), equalTo(expectedEighthStepKey));

WaitForIndexColorStep eighthStep = (WaitForIndexColorStep) steps.get(7);
assertThat(eighthStep.getColor(), is(ClusterHealthStatus.YELLOW));
OpenIndexStep eighthStep = (OpenIndexStep) steps.get(7);
assertThat(eighthStep.getKey(), equalTo(expectedEighthStepKey));
assertThat(eighthStep.getNextStepKey(), equalTo(nextStepKey));
assertThat(eighthStep.getNextStepKey(), equalTo(expectedNinthStepKey));

WaitForIndexColorStep ninth = (WaitForIndexColorStep) steps.get(8);
assertThat(ninth.getColor(), is(ClusterHealthStatus.YELLOW));
assertThat(ninth.getKey(), equalTo(expectedNinthStepKey));
assertThat(ninth.getNextStepKey(), equalTo(nextStepKey));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -28,9 +31,11 @@
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -39,14 +44,47 @@

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class CCRIndexLifecycleIT extends ESCCRRestTestCase {

private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class);
private static final String TSDB_INDEX_TEMPLATE = """
{
"index_patterns": ["%s*"],
"data_stream": {},
"template": {
"settings":{
"index": {
"number_of_replicas": 0,
"number_of_shards": 1,
"routing_path": ["metricset"],
"mode": "time_series"
},
"index.lifecycle.name": "%s"
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": true
},
"volume": {
"type": "double",
"time_series_metric": "gauge"
}
}
}
}
}""";

public void testBasicCCRAndILMIntegration() throws Exception {
String indexName = "logs-1";
Expand Down Expand Up @@ -533,6 +571,94 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception {
}
}

@SuppressWarnings({ "checkstyle:LineLength", "unchecked" })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the line length warning supress, you can run ./gradlew spotlessApply to auto-format.

Also please take a look at the contribution guide for more details and other tips: https://github.com/elastic/elasticsearch/blob/main/CONTRIBUTING.md#java-language-formatting-guidelines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your code review. I've reflected your suggestions!

public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception {
String indexPattern = "tsdb-index-";
String dataStream = "tsdb-index-cpu";
String policyName = "tsdb-policy";

if ("leader".equals(targetCluster)) {
putILMPolicy(policyName, null, 1, null);
Request templateRequest = new Request("PUT", "/_index_template/tsdb_template");
templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName));
assertOK(client().performRequest(templateRequest));
} else if ("follow".equals(targetCluster)) {
putILMPolicy(policyName, null, 1, null);

Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern");
createAutoFollowRequest.setJsonEntity("""
{
"leader_index_patterns": [ ".ds-tsdb-index-*" ],
"remote_cluster": "leader_cluster",
"read_poll_timeout": "1000ms",
"follow_index_pattern": "{{leader_index}}"
}""");
assertOK(client().performRequest(createAutoFollowRequest));

try (RestClient leaderClient = buildLeaderClient()) {
String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now());
index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5));

String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0);
assertBusy(() -> { assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName))); });

// rollover
Request rolloverRequest = new Request("POST", "/" + dataStream + "/_rollover");
rolloverRequest.setJsonEntity("""
{
"conditions": {
"max_docs": "1"
}
}""");
leaderClient.performRequest(rolloverRequest);

assertBusy(() -> {
assertThat(
"index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses",
explainIndex(client(), backingIndexName).get("step"),
is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
);

assertThat(explainIndex(client(), backingIndexName).get("step_info"), is(notNullValue()));
assertThat(
(String) ((Map<String, Object>) explainIndex(client(), backingIndexName).get("step_info")).get("message"),
containsString("Waiting until the index's time series end time lapses")
);
}, 30, TimeUnit.SECONDS);

int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName);

// Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep
String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName())
.format(Instant.now().plusSeconds(30));

for (int i = 0; i < 5; i++) {
index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i);
}

// Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep
assertBusy(() -> {
int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName);
int currentFollowerDocCount = getDocCount(client(), backingIndexName);

assertThat(
"Leader should have more documents than initially",
currentLeaderDocCount,
greaterThan(initialLeaderDocCount)
);
assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount));

// Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep
assertThat(
"Index should still be in WaitUntilTimeSeriesEndTimePassesStep",
explainIndex(client(), backingIndexName).get("step"),
is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
);
}, 30, TimeUnit.SECONDS);
}
}
}

private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException {
logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
Request request = new Request("PUT", "/_cluster/settings");
Expand Down Expand Up @@ -839,4 +965,33 @@ private static String getShrinkIndexName(RestClient client, String originalIndex
: "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]";
return shrunkenIndexName[0];
}

private static Map<String, Object> explainIndex(RestClient client, String indexName) throws IOException {
RequestOptions consumeWarningsOptions = RequestOptions.DEFAULT.toBuilder()
.setWarningsHandler(warnings -> warnings.isEmpty() == false && List.of("""
[indices.lifecycle.rollover.only_if_has_documents] setting was deprecated in Elasticsearch \
and will be removed in a future release. \
See the deprecation documentation for the next major version.""").equals(warnings) == false)
.build();

Request explainRequest = new Request("GET", indexName + "/_ilm/explain");
explainRequest.setOptions(consumeWarningsOptions);
Response response = client.performRequest(explainRequest);
Map<String, Object> responseMap;
try (InputStream is = response.getEntity().getContent()) {
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}

@SuppressWarnings("unchecked")
Map<String, Map<String, Object>> indexResponse = ((Map<String, Map<String, Object>>) responseMap.get("indices"));
return indexResponse.get(indexName);
}

private static int getDocCount(RestClient client, String indexName) throws IOException {
Request countRequest = new Request("GET", "/" + indexName + "/_count");
Response response = client.performRequest(countRequest);
Map<String, Object> result = entityAsMap(response);
System.out.println("result = " + result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the println

return (int) result.get("count");
}
}