diff --git a/docs/changelog/128361.yaml b/docs/changelog/128361.yaml new file mode 100644 index 0000000000000..901c1141afe90 --- /dev/null +++ b/docs/changelog/128361.yaml @@ -0,0 +1,6 @@ +pr: 128361 +summary: The follower index should wait until the time series end time passes before unfollowing the leader index. +area: ILM+SLM +type: bug +issues: + - 128129 diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 0e739c74fcace..3d67555c7d692 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2116,9 +2116,16 @@ protected static boolean aliasExists(String index, String alias) throws IOExcept /** * Returns a list of the data stream's backing index names. */ - @SuppressWarnings("unchecked") protected static List getDataStreamBackingIndexNames(String dataStreamName) throws IOException { - Map response = getAsMap(client(), "/_data_stream/" + dataStreamName); + return getDataStreamBackingIndexNames(client(), dataStreamName); + } + + /** + * Returns a list of the data stream's backing index names. + */ + @SuppressWarnings("unchecked") + protected static List getDataStreamBackingIndexNames(RestClient client, String dataStreamName) throws IOException { + Map response = getAsMap(client, "/_data_stream/" + dataStreamName); List dataStreams = (List) response.get("data_streams"); assertThat(dataStreams.size(), equalTo(1)); Map dataStream = (Map) dataStreams.get(0); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java index 6bb0178f1471e..a8f41a527a6e8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; @@ -44,6 +45,7 @@ private UnfollowAction() {} public List toSteps(Client client, String phase, StepKey nextStepKey) { StepKey preUnfollowKey = new StepKey(phase, NAME, CONDITIONAL_UNFOLLOW_STEP); StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME); + StepKey waitUntilTimeSeriesEndTimePassesStep = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME); StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME); StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME); @@ -64,14 +66,19 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { return customIndexMetadata == null; } ); - WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks); - WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client); - PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); - CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); - UnfollowFollowerIndexStep step5 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client); - OpenIndexStep step6 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client); - WaitForIndexColorStep step7 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW); - return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7); + WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitUntilTimeSeriesEndTimePassesStep); + WaitUntilTimeSeriesEndTimePassesStep step2 = new WaitUntilTimeSeriesEndTimePassesStep( + waitUntilTimeSeriesEndTimePassesStep, + waitForFollowShardTasks, + Instant::now + ); + WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client); + PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); + CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); + UnfollowFollowerIndexStep step6 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client); + OpenIndexStep step7 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client); + WaitForIndexColorStep step8 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW); + return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7, step8); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java index 66aacfdc68667..3820eaa19445f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java @@ -51,16 +51,17 @@ public void testToSteps() { ); List steps = action.toSteps(null, phase, nextStepKey); assertThat(steps, notNullValue()); - assertThat(steps.size(), equalTo(8)); + assertThat(steps.size(), equalTo(9)); StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowAction.CONDITIONAL_UNFOLLOW_STEP); StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME); - StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME); - StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME); - StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME); - StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME); - StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME); - StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME); + StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME); + StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME); + StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME); + StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME); + StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME); + StepKey expectedNinthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME); BranchingStep firstStep = (BranchingStep) steps.get(0); assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey)); @@ -69,30 +70,34 @@ public void testToSteps() { assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey)); assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey)); - WaitForFollowShardTasksStep thirdStep = (WaitForFollowShardTasksStep) steps.get(2); + WaitUntilTimeSeriesEndTimePassesStep thirdStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(2); assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey)); assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey)); - PauseFollowerIndexStep fourthStep = (PauseFollowerIndexStep) steps.get(3); + WaitForFollowShardTasksStep fourthStep = (WaitForFollowShardTasksStep) steps.get(3); assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey)); assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey)); - CloseFollowerIndexStep fifthStep = (CloseFollowerIndexStep) steps.get(4); + PauseFollowerIndexStep fifthStep = (PauseFollowerIndexStep) steps.get(4); assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey)); assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey)); - UnfollowFollowerIndexStep sixthStep = (UnfollowFollowerIndexStep) steps.get(5); + CloseFollowerIndexStep sixthStep = (CloseFollowerIndexStep) steps.get(5); assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey)); assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey)); - OpenIndexStep seventhStep = (OpenIndexStep) steps.get(6); + UnfollowFollowerIndexStep seventhStep = (UnfollowFollowerIndexStep) steps.get(6); assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey)); assertThat(seventhStep.getNextStepKey(), equalTo(expectedEighthStepKey)); - WaitForIndexColorStep eighthStep = (WaitForIndexColorStep) steps.get(7); - assertThat(eighthStep.getColor(), is(ClusterHealthStatus.YELLOW)); + OpenIndexStep eighthStep = (OpenIndexStep) steps.get(7); assertThat(eighthStep.getKey(), equalTo(expectedEighthStepKey)); - assertThat(eighthStep.getNextStepKey(), equalTo(nextStepKey)); + assertThat(eighthStep.getNextStepKey(), equalTo(expectedNinthStepKey)); + + WaitForIndexColorStep ninth = (WaitForIndexColorStep) steps.get(8); + assertThat(ninth.getColor(), is(ClusterHealthStatus.YELLOW)); + assertThat(ninth.getKey(), equalTo(expectedNinthStepKey)); + assertThat(ninth.getNextStepKey(), equalTo(nextStepKey)); } @Override diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 66bcb1b201cc2..0b26c0c323abe 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -17,6 +17,8 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.RestStatus; @@ -28,9 +30,11 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.UnfollowAction; +import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.List; import java.util.Locale; import java.util.Map; @@ -39,7 +43,9 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -47,6 +53,37 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class); + private static final String TSDB_INDEX_TEMPLATE = """ + { + "index_patterns": ["%s*"], + "data_stream": {}, + "template": { + "settings":{ + "index": { + "number_of_replicas": 0, + "number_of_shards": 1, + "routing_path": ["metricset"], + "mode": "time_series" + }, + "index.lifecycle.name": "%s" + }, + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + }, + "volume": { + "type": "double", + "time_series_metric": "gauge" + } + } + } + } + }"""; public void testBasicCCRAndILMIntegration() throws Exception { String indexName = "logs-1"; @@ -533,6 +570,91 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception { } } + @SuppressWarnings("unchecked") + public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception { + String indexPattern = "tsdb-index-"; + String dataStream = "tsdb-index-cpu"; + String policyName = "tsdb-policy"; + + if ("leader".equals(targetCluster)) { + putILMPolicy(policyName, null, 1, null); + Request templateRequest = new Request("PUT", "/_index_template/tsdb_template"); + templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName)); + assertOK(client().performRequest(templateRequest)); + } else if ("follow".equals(targetCluster)) { + // Use unfollow-only policy for follower cluster instead of regular ILM policy + // Follower clusters should not have their own rollover actions as they are meant + // to follow the rollover behavior of the leader index, not initiate their own rollovers + putUnfollowOnlyPolicy(client(), policyName); + + Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern"); + createAutoFollowRequest.setJsonEntity(""" + { + "leader_index_patterns": [ "tsdb-index-*" ], + "remote_cluster": "leader_cluster", + "read_poll_timeout": "1000ms", + "follow_index_pattern": "{{leader_index}}" + }"""); + assertOK(client().performRequest(createAutoFollowRequest)); + + try (RestClient leaderClient = buildLeaderClient()) { + String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now()); + + // Index a document on the leader index, this should trigger an ILM rollover. + // This will ensure that 'index.lifecycle.indexing_complete' is set. + index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); + + String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0); + assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName)))); + + assertBusy(() -> { + Map indexExplanation = explainIndex(client(), backingIndexName); + assertThat( + "index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses", + indexExplanation.get("step"), + is(WaitUntilTimeSeriesEndTimePassesStep.NAME) + ); + + assertThat(indexExplanation.get("step_info"), is(notNullValue())); + assertThat( + (String) ((Map) indexExplanation.get("step_info")).get("message"), + containsString("Waiting until the index's time series end time lapses") + ); + }, 30, TimeUnit.SECONDS); + + int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName); + + // Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep + String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()) + .format(Instant.now().plusSeconds(30)); + + for (int i = 0; i < 5; i++) { + index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i); + } + + // Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep + assertBusy(() -> { + int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName); + int currentFollowerDocCount = getDocCount(client(), backingIndexName); + + assertThat( + "Leader should have more documents than initially", + currentLeaderDocCount, + greaterThan(initialLeaderDocCount) + ); + assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount)); + + // Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep + assertThat( + "Index should still be in WaitUntilTimeSeriesEndTimePassesStep", + explainIndex(client(), backingIndexName).get("step"), + is(WaitUntilTimeSeriesEndTimePassesStep.NAME) + ); + }, 30, TimeUnit.SECONDS); + } + } + } + private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException { logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed); Request request = new Request("PUT", "/_cluster/settings"); @@ -839,4 +961,24 @@ private static String getShrinkIndexName(RestClient client, String originalIndex : "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]"; return shrunkenIndexName[0]; } + + private static Map explainIndex(RestClient client, String indexName) throws IOException { + Request explainRequest = new Request("GET", indexName + "/_ilm/explain"); + Response response = client.performRequest(explainRequest); + Map responseMap; + try (InputStream is = response.getEntity().getContent()) { + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + + @SuppressWarnings("unchecked") + Map> indexResponse = ((Map>) responseMap.get("indices")); + return indexResponse.get(indexName); + } + + private static int getDocCount(RestClient client, String indexName) throws IOException { + Request countRequest = new Request("GET", "/" + indexName + "/_count"); + Response response = client.performRequest(countRequest); + Map result = entityAsMap(response); + return (int) result.get("count"); + } }