Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/128361.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 128361
summary: The follower index should wait until the time series end time passes before unfollowing the leader index.
area: ILM+SLM
type: bug
issues:
- 128129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add new integration test in the CCRIndexLifecycleIT test suite to cover this scenario. You can refer to this test as an example for verifying the WaitUntilTimeSeriesEndTimePassesStep. Essentially we would want to verify that after leader index rollovers, the follower index goes into the WaitUntilTimeSeriesEndTimePassesStep, and new documents to the leader index are synced to the follower index until end_time has passed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the code review.
I'll review the existing test cases and work on adding new ones.I'm going to add test code!

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -44,6 +45,7 @@ private UnfollowAction() {}
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey preUnfollowKey = new StepKey(phase, NAME, CONDITIONAL_UNFOLLOW_STEP);
StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME);
StepKey waitUntilTimeSeriesEndTimePassesStep = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME);
StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME);
StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME);
StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME);
Expand All @@ -59,14 +61,21 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
// if the index has no CCR metadata we'll skip the unfollow action completely
return customIndexMetadata == null;
});
WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks);
WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
UnfollowFollowerIndexStep step5 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
OpenIndexStep step6 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client);
WaitForIndexColorStep step7 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW);
return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7);
WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitUntilTimeSeriesEndTimePassesStep);

WaitUntilTimeSeriesEndTimePassesStep step2 = new WaitUntilTimeSeriesEndTimePassesStep(
waitUntilTimeSeriesEndTimePassesStep,
waitForFollowShardTasks,
Instant::now
);

WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
UnfollowFollowerIndexStep step6 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
OpenIndexStep step7 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client);
WaitForIndexColorStep step8 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW);
return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7, step8);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ public void testToSteps() {
);
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertThat(steps, notNullValue());
assertThat(steps.size(), equalTo(8));
assertThat(steps.size(), equalTo(9));

StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowAction.CONDITIONAL_UNFOLLOW_STEP);
StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME);
StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME);
StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME);
StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME);
StepKey expectedNinthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME);

BranchingStep firstStep = (BranchingStep) steps.get(0);
assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey));
Expand All @@ -69,30 +70,34 @@ public void testToSteps() {
assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey));
assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey));

WaitForFollowShardTasksStep thirdStep = (WaitForFollowShardTasksStep) steps.get(2);
WaitUntilTimeSeriesEndTimePassesStep thirdStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(2);
assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey));
assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey));

PauseFollowerIndexStep fourthStep = (PauseFollowerIndexStep) steps.get(3);
WaitForFollowShardTasksStep fourthStep = (WaitForFollowShardTasksStep) steps.get(3);
assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey));
assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey));

CloseFollowerIndexStep fifthStep = (CloseFollowerIndexStep) steps.get(4);
PauseFollowerIndexStep fifthStep = (PauseFollowerIndexStep) steps.get(4);
assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey));
assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey));

UnfollowFollowerIndexStep sixthStep = (UnfollowFollowerIndexStep) steps.get(5);
CloseFollowerIndexStep sixthStep = (CloseFollowerIndexStep) steps.get(5);
assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey));
assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey));

OpenIndexStep seventhStep = (OpenIndexStep) steps.get(6);
UnfollowFollowerIndexStep seventhStep = (UnfollowFollowerIndexStep) steps.get(6);
assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey));
assertThat(seventhStep.getNextStepKey(), equalTo(expectedEighthStepKey));

WaitForIndexColorStep eighthStep = (WaitForIndexColorStep) steps.get(7);
assertThat(eighthStep.getColor(), is(ClusterHealthStatus.YELLOW));
OpenIndexStep eighthStep = (OpenIndexStep) steps.get(7);
assertThat(eighthStep.getKey(), equalTo(expectedEighthStepKey));
assertThat(eighthStep.getNextStepKey(), equalTo(nextStepKey));
assertThat(eighthStep.getNextStepKey(), equalTo(expectedNinthStepKey));

WaitForIndexColorStep ninth = (WaitForIndexColorStep) steps.get(8);
assertThat(ninth.getColor(), is(ClusterHealthStatus.YELLOW));
assertThat(ninth.getKey(), equalTo(expectedNinthStepKey));
assertThat(ninth.getNextStepKey(), equalTo(nextStepKey));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -28,9 +30,11 @@
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -39,14 +43,47 @@

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class CCRIndexLifecycleIT extends ESCCRRestTestCase {

private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class);
private static final String TSDB_INDEX_TEMPLATE = """
{
"index_patterns": ["%s*"],
"data_stream": {},
"template": {
"settings":{
"index": {
"number_of_replicas": 0,
"number_of_shards": 1,
"routing_path": ["metricset"],
"mode": "time_series"
},
"index.lifecycle.name": "%s"
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": true
},
"volume": {
"type": "double",
"time_series_metric": "gauge"
}
}
}
}
}""";

public void testBasicCCRAndILMIntegration() throws Exception {
String indexName = "logs-1";
Expand Down Expand Up @@ -533,6 +570,91 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception {
}
}

@SuppressWarnings("unchecked")
public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception {
String indexPattern = "tsdb-index-";
String dataStream = "tsdb-index-cpu";
String policyName = "tsdb-policy";

if ("leader".equals(targetCluster)) {
putILMPolicy(policyName, null, 1, null);
Request templateRequest = new Request("PUT", "/_index_template/tsdb_template");
templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName));
assertOK(client().performRequest(templateRequest));
} else if ("follow".equals(targetCluster)) {
// Use unfollow-only policy for follower cluster instead of regular ILM policy
// Follower clusters should not have their own rollover actions as they are meant
// to follow the rollover behavior of the leader index, not initiate their own rollovers
putUnfollowOnlyPolicy(client(), policyName);

Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern");
createAutoFollowRequest.setJsonEntity("""
{
"leader_index_patterns": [ "tsdb-index-*" ],
"remote_cluster": "leader_cluster",
"read_poll_timeout": "1000ms",
"follow_index_pattern": "{{leader_index}}"
}""");
assertOK(client().performRequest(createAutoFollowRequest));

try (RestClient leaderClient = buildLeaderClient()) {
String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now());

// Index a document on the leader index, this should trigger an ILM rollover.
// This will ensure that 'index.lifecycle.indexing_complete' is set.
index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5));

String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0);
assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName))));

assertBusy(() -> {
Map<String, Object> indexExplanation = explainIndex(client(), backingIndexName);
assertThat(
"index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses",
indexExplanation.get("step"),
is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
);

assertThat(indexExplanation.get("step_info"), is(notNullValue()));
assertThat(
(String) ((Map<String, Object>) indexExplanation.get("step_info")).get("message"),
containsString("Waiting until the index's time series end time lapses")
);
}, 30, TimeUnit.SECONDS);

int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName);

// Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep
String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName())
.format(Instant.now().plusSeconds(30));

for (int i = 0; i < 5; i++) {
index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i);
}

// Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep
assertBusy(() -> {
int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName);
int currentFollowerDocCount = getDocCount(client(), backingIndexName);

assertThat(
"Leader should have more documents than initially",
currentLeaderDocCount,
greaterThan(initialLeaderDocCount)
);
assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount));

// Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep
assertThat(
"Index should still be in WaitUntilTimeSeriesEndTimePassesStep",
explainIndex(client(), backingIndexName).get("step"),
is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
);
}, 30, TimeUnit.SECONDS);
}
}
}

private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException {
logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
Request request = new Request("PUT", "/_cluster/settings");
Expand Down Expand Up @@ -839,4 +961,24 @@ private static String getShrinkIndexName(RestClient client, String originalIndex
: "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]";
return shrunkenIndexName[0];
}

private static Map<String, Object> explainIndex(RestClient client, String indexName) throws IOException {
Request explainRequest = new Request("GET", indexName + "/_ilm/explain");
Response response = client.performRequest(explainRequest);
Map<String, Object> responseMap;
try (InputStream is = response.getEntity().getContent()) {
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}

@SuppressWarnings("unchecked")
Map<String, Map<String, Object>> indexResponse = ((Map<String, Map<String, Object>>) responseMap.get("indices"));
return indexResponse.get(indexName);
}

private static int getDocCount(RestClient client, String indexName) throws IOException {
Request countRequest = new Request("GET", "/" + indexName + "/_count");
Response response = client.performRequest(countRequest);
Map<String, Object> result = entityAsMap(response);
return (int) result.get("count");
}
}
Loading