diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 04f1d73c09b44..0802846721691 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -89,7 +89,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY; @@ -160,22 +159,12 @@ public void testRolloverLifecycle() throws Exception { indexDocs(dataStreamName, 1); - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + assertThat(backingIndices.size(), equalTo(2)); + String backingIndex = backingIndices.get(0); + assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); + String writeIndex = backingIndices.get(1); + assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); } public void testRolloverAndRetention() throws Exception { @@ -194,19 +183,11 @@ public void testRolloverAndRetention() throws Exception { indexDocs(dataStreamName, 1); assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(backingIndices.size(), equalTo(1)); // we expect the data stream to have only one backing index, the write one, with generation 2 // as generation 1 would've been deleted by the data stream lifecycle given the configuration - String writeIndex = backingIndices.get(0).getName(); + String writeIndex = backingIndices.get(0); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); }); } @@ -232,23 +213,10 @@ public void testSystemDataStreamRetention() throws Exception { client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet(); indexDocs(SYSTEM_DATA_STREAM_NAME, 1); now.addAndGet(TimeValue.timeValueSeconds(30).millis()); - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { SYSTEM_DATA_STREAM_NAME } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute( - GetDataStreamAction.INSTANCE, - getDataStreamRequest - ).actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); // global retention is ignored - // we expect the data stream to have two backing indices since the effective retention is 100 days - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(SYSTEM_DATA_STREAM_NAME, 2); + // we expect the data stream to have two backing indices since the effective retention is 100 days + String writeIndex = backingIndices.get(1); + assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2)); // Now we advance the time to well beyond the configured retention. We expect that the older index will have been deleted. now.addAndGet(TimeValue.timeValueDays(3 * TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS).millis()); @@ -263,12 +231,12 @@ public void testSystemDataStreamRetention() throws Exception { ).actionGet(); assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(1)); // global retention is ignored + List currentBackingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); + assertThat(currentBackingIndices.size(), equalTo(1)); // global retention is ignored // we expect the data stream to have only one backing index, the write one, with generation 2 // as generation 1 would've been deleted by the data stream lifecycle given the configuration - String writeIndex = backingIndices.get(0).getName(); - assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2)); + String currentWriteIndex = currentBackingIndices.get(0).getName(); + assertThat(currentWriteIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2)); try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { builder.humanReadable(true); ToXContent.Params withEffectiveRetention = new ToXContent.MapParams( @@ -378,18 +346,9 @@ public void testOriginationDate() throws Exception { ).get(); assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - Set indexNames = backingIndices.stream().map(Index::getName).collect(Collectors.toSet()); - assertTrue(indexNames.contains("index_new")); - assertFalse(indexNames.contains("index_old")); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); + assertTrue(backingIndices.contains("index_new")); + assertFalse(backingIndices.contains("index_old")); }); } @@ -412,21 +371,8 @@ public void testUpdatingLifecycleAppliesToAllBackingIndices() throws Exception { // Update the lifecycle of the data stream updateLifecycle(dataStreamName, TimeValue.timeValueMillis(1)); // Verify that the retention has changed for all backing indices - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream(); - assertThat(dataStream.getName(), equalTo(dataStreamName)); - List backingIndices = dataStream.getIndices(); - assertThat(backingIndices.size(), equalTo(1)); - String writeIndex = dataStream.getWriteIndex().getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, finalGeneration)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 1); + assertThat(backingIndices.getFirst(), backingIndexEqualTo(dataStreamName, finalGeneration)); } public void testAutomaticForceMerge() throws Exception { @@ -476,7 +422,8 @@ public void testAutomaticForceMerge() throws Exception { int finalGeneration = randomIntBetween(3, 4); for (int currentGeneration = 1; currentGeneration < finalGeneration; currentGeneration++) { // This is currently the write index, but it will be rolled over as soon as data stream lifecycle runs: - final String toBeRolledOverIndex = getBackingIndices(dataStreamName).get(currentGeneration - 1); + final var backingIndexNames = waitForDataStreamBackingIndices(dataStreamName, currentGeneration); + final String toBeRolledOverIndex = backingIndexNames.get(currentGeneration - 1); for (int i = 0; i < randomIntBetween(10, 50); i++) { indexDocs(dataStreamName, randomIntBetween(1, 300)); // Make sure the segments get written: @@ -488,7 +435,7 @@ public void testAutomaticForceMerge() throws Exception { if (currentGeneration == 1) { toBeForceMergedIndex = null; // Not going to be used } else { - toBeForceMergedIndex = getBackingIndices(dataStreamName).get(currentGeneration - 2); + toBeForceMergedIndex = backingIndexNames.get(currentGeneration - 2); } int currentBackingIndexCount = currentGeneration; DataStreamLifecycleService dataStreamLifecycleService = internalCluster().getInstance( @@ -499,19 +446,9 @@ public void testAutomaticForceMerge() throws Exception { // run data stream lifecycle once dataStreamLifecycleService.run(clusterService.state()); assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream(); - assertThat(dataStream.getName(), equalTo(dataStreamName)); - List backingIndices = dataStream.getIndices(); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(backingIndices.size(), equalTo(currentBackingIndexCount + 1)); - String writeIndex = dataStream.getWriteIndex().getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, currentBackingIndexCount + 1)); + assertThat(backingIndices.getLast(), backingIndexEqualTo(dataStreamName, currentBackingIndexCount + 1)); /* * We only expect forcemerge to happen on the 2nd data stream lifecycle run and later, since on the first there's only the * single write index to be rolled over. @@ -567,40 +504,18 @@ public void testErrorRecordingOnRollover() throws Exception { indexDocs(dataStreamName, 1); // let's allow one rollover to go through - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + var backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); // prevent new indices from being created (ie. future rollovers) updateClusterSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 1)); indexDocs(dataStreamName, 1); - String writeIndexName = getBackingIndices(dataStreamName).get(1); + String writeIndexName = backingIndices.get(1); assertBusy(() -> { - ErrorEntry writeIndexRolloverError = null; - Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); - - for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName); - if (writeIndexRolloverError != null) { - break; - } - } + DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class); + ErrorEntry writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName); assertThat(writeIndexRolloverError, is(notNullValue())); assertThat(writeIndexRolloverError.error(), containsString("maximum normal shards open")); @@ -620,6 +535,9 @@ public void testErrorRecordingOnRollover() throws Exception { assertTrue(found); }, 30, TimeUnit.SECONDS); + // Ensure data stream did not roll over yet. + assertEquals(2, getDataStreamBackingIndexNames(dataStreamName).size()); + // DSL should signal to the health node that there's an error in the store that's been retried at least 3 times assertBusy(() -> { FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute( @@ -660,20 +578,17 @@ public void testErrorRecordingOnRollover() throws Exception { updateClusterSettings(Settings.builder().putNull("*")); assertBusy(() -> { - List backingIndices = getBackingIndices(dataStreamName); - assertThat(backingIndices.size(), equalTo(3)); - String writeIndex = backingIndices.get(2); + List currentBackingIndices = getDataStreamBackingIndexNames(dataStreamName); + assertThat(currentBackingIndices.size(), equalTo(3)); + String writeIndex = currentBackingIndices.get(2); // rollover was successful and we got to generation 3 assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 3)); // we recorded the error against the previous write index (generation 2) // let's check there's no error recorded against it anymore - String previousWriteInddex = backingIndices.get(1); - Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); - - for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue()); - } + String previousWriteInddex = currentBackingIndices.get(1); + DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class); + assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue()); }); // the error has been fixed so the health information shouldn't be reported anymore @@ -725,24 +640,7 @@ public void testErrorRecordingOnRetention() throws Exception { indexDocs(dataStreamName, 1); // let's allow one rollover to go through - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); - - List dsBackingIndices = getBackingIndices(dataStreamName); + List dsBackingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); String firstGenerationIndex = dsBackingIndices.get(0); String secondGenerationIndex = dsBackingIndices.get(1); @@ -752,31 +650,18 @@ public void testErrorRecordingOnRetention() throws Exception { updateLifecycle(dataStreamName, TimeValue.timeValueSeconds(1)); assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(backingIndices.size(), equalTo(2)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); + assertThat(backingIndices.getLast(), backingIndexEqualTo(dataStreamName, 2)); - ErrorEntry recordedRetentionExecutionError = null; - Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); - - for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - recordedRetentionExecutionError = lifecycleService.getErrorStore() - .getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex); - if (recordedRetentionExecutionError != null && recordedRetentionExecutionError.retryCount() > 3) { - break; - } - } + DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance( + DataStreamLifecycleService.class + ); + ErrorEntry recordedRetentionExecutionError = lifecycleService.getErrorStore() + .getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex); assertThat(recordedRetentionExecutionError, is(notNullValue())); + assertThat(recordedRetentionExecutionError.retryCount(), greaterThanOrEqualTo(3)); assertThat(recordedRetentionExecutionError.error(), containsString("blocked by: [FORBIDDEN/5/index read-only (api)")); }); @@ -819,23 +704,15 @@ public void testErrorRecordingOnRetention() throws Exception { updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), false), firstGenerationIndex); assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); // data stream only has one index now assertThat(backingIndices.size(), equalTo(1)); - // error stores don't contain anything for the first generation index anymore - Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); - for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue()); - } + // error store doesn't contain anything for the first generation index anymore + DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance( + DataStreamLifecycleService.class + ); + assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue()); }); // health info for DSL should be EMPTY as everything's healthy @@ -895,24 +772,12 @@ public void testDataLifecycleServiceConfiguresTheMergePolicy() throws Exception indexDocs(dataStreamName, 1); // let's allow one rollover to go through - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String firstGenerationIndex = backingIndices.get(0); + assertThat(firstGenerationIndex, backingIndexEqualTo(dataStreamName, 1)); + String writeIndex = backingIndices.get(1); + assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - String firstGenerationIndex = getBackingIndices(dataStreamName).get(0); ClusterGetSettingsAction.Response response = client().execute( ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TEST_REQUEST_TIMEOUT) @@ -950,12 +815,9 @@ public void testDataLifecycleServiceConfiguresTheMergePolicy() throws Exception indexDocs(dataStreamName, 1); // let's allow one rollover to go through - assertBusy(() -> { - List backingIndices = getBackingIndices(dataStreamName); - assertThat(backingIndices.size(), equalTo(3)); - }); + backingIndices = waitForDataStreamBackingIndices(dataStreamName, 3); + String secondGenerationIndex = backingIndices.get(1); - String secondGenerationIndex = getBackingIndices(dataStreamName).get(1); // check the 2nd generation index picked up the new setting values assertBusy(() -> { GetSettingsRequest getSettingsRequest = new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(secondGenerationIndex) @@ -986,7 +848,7 @@ public void testReenableDataStreamLifecycle() throws Exception { client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); indexDocs(dataStreamName, 10); - List backingIndices = getBackingIndices(dataStreamName); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); { // backing index should not be managed String writeIndex = backingIndices.get(0); @@ -1021,14 +883,11 @@ public void testReenableDataStreamLifecycle() throws Exception { ) ); - assertBusy(() -> { - List currentBackingIndices = getBackingIndices(dataStreamName); - assertThat(currentBackingIndices.size(), equalTo(2)); - String backingIndex = currentBackingIndices.get(0); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = currentBackingIndices.get(1); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List currentBackingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String backingIndex = currentBackingIndices.get(0); + assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); + String writeIndex = currentBackingIndices.get(1); + assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); } public void testLifecycleAppliedToFailureStore() throws Exception { @@ -1069,24 +928,9 @@ public void testLifecycleAppliedToFailureStore() throws Exception { indexInvalidFlagDocs(dataStreamName, 1); // Let's verify the rollover - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(1)); - List failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices(); - assertThat(failureIndices.size(), equalTo(2)); - }); - - List indices = getFailureIndices(dataStreamName); - String firstGenerationIndex = indices.get(0); - String secondGenerationIndex = indices.get(1); + List failureIndices = waitForDataStreamIndices(dataStreamName, 2, true); + String firstGenerationIndex = failureIndices.get(0); + String secondGenerationIndex = failureIndices.get(1); // Let's verify the merge settings ClusterGetSettingsAction.Response response = client().execute( @@ -1126,36 +970,12 @@ public void testLifecycleAppliedToFailureStore() throws Exception { assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); assertThat(backingIndices.size(), equalTo(1)); - List failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices(); - assertThat(failureIndices.size(), equalTo(1)); - assertThat(failureIndices.get(0).getName(), equalTo(secondGenerationIndex)); + List retrievedFailureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices(); + assertThat(retrievedFailureIndices.size(), equalTo(1)); + assertThat(retrievedFailureIndices.get(0).getName(), equalTo(secondGenerationIndex)); }); } - private static List getBackingIndices(String dataStreamName) { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().stream().map(Index::getName).toList(); - } - - private static List getFailureIndices(String dataStreamName) { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - return getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().stream().map(Index::getName).toList(); - } - static void indexDocs(String dataStream, int numDocs) { BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < numDocs; i++) { diff --git a/muted-tests.yml b/muted-tests.yml index 2cb96c3c42d4e..ee2f7704726fd 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -330,9 +330,6 @@ tests: - class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests method: testUpdateIndexTemplateToDataStreamLifecyclePreference issue: https://github.com/elastic/elasticsearch/issues/124837 -- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT - method: testAutomaticForceMerge - issue: https://github.com/elastic/elasticsearch/issues/124846 - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=search.vectors/41_knn_search_bbq_hnsw/Test knn search} issue: https://github.com/elastic/elasticsearch/issues/124848 @@ -354,9 +351,6 @@ tests: - class: org.elasticsearch.packaging.test.BootstrapCheckTests method: test20RunWithBootstrapChecks issue: https://github.com/elastic/elasticsearch/issues/124940 -- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT - method: testErrorRecordingOnRetention - issue: https://github.com/elastic/elasticsearch/issues/124950 - class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT method: testStopQueryLocalNoRemotes issue: https://github.com/elastic/elasticsearch/issues/124959 @@ -378,9 +372,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test011SecurityEnabledStatus issue: https://github.com/elastic/elasticsearch/issues/124990 -- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT - method: testLifecycleAppliedToFailureStore - issue: https://github.com/elastic/elasticsearch/issues/124999 - class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests method: testGetDataStreamResponse issue: https://github.com/elastic/elasticsearch/issues/125083 diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 33ad36e22b45c..3b648e5f8da06 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -854,10 +854,53 @@ private static Settings.Builder getExcludeSettings(int num, Settings.Builder bui return builder; } + /** + * Waits for the specified data stream to have the expected number of backing indices. + */ + public static List waitForDataStreamBackingIndices(String dataStreamName, int expectedSize) { + return waitForDataStreamIndices(dataStreamName, expectedSize, false); + } + + /** + * Waits for the specified data stream to have the expected number of backing or failure indices. + */ + public static List waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) { + // We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state. + // This avoids inconsistencies in subsequent API calls which might hit a non-master node. + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { + final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName); + if (dataStream == null) { + return false; + } + return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize; + }); + safeAwait(listener); + final var backingIndexNames = getDataStreamBackingIndexNames(dataStreamName, failureStore); + assertEquals( + Strings.format( + "Retrieved number of data stream indices doesn't match expectation for data stream [%s]. Expected %d but got %s", + dataStreamName, + expectedSize, + backingIndexNames + ), + expectedSize, + backingIndexNames.size() + ); + return backingIndexNames; + } + /** * Returns a list of the data stream's backing index names. */ - public List getDataStreamBackingIndexNames(String dataStreamName) { + public static List getDataStreamBackingIndexNames(String dataStreamName) { + return getDataStreamBackingIndexNames(dataStreamName, false); + } + + /** + * Returns a list of the data stream's backing or failure index names. + */ + public static List getDataStreamBackingIndexNames(String dataStreamName, boolean failureStore) { GetDataStreamAction.Response response = safeGet( client().execute( GetDataStreamAction.INSTANCE, @@ -867,7 +910,7 @@ public List getDataStreamBackingIndexNames(String dataStreamName) { assertThat(response.getDataStreams().size(), equalTo(1)); DataStream dataStream = response.getDataStreams().getFirst().getDataStream(); assertThat(dataStream.getName(), equalTo(dataStreamName)); - return dataStream.getIndices().stream().map(Index::getName).toList(); + return dataStream.getDataStreamIndices(failureStore).getIndices().stream().map(Index::getName).toList(); } /**