From 8cecdaae3e0a9c3c9fd09e54ea1b472f01739142 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Thu, 20 Feb 2025 13:39:09 -0600 Subject: [PATCH 1/2] Remove lifecycle from dest index and add after adding to data stream --- .../core/security/user/InternalUsers.java | 2 + ...indexDatastreamIndexTransportActionIT.java | 162 +++++++++++++++++- ...ReindexDataStreamIndexTransportAction.java | 1 + ...indexDataStreamPersistentTaskExecutor.java | 31 +++- 4 files changed, 190 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 9a0b17b22369c..a34c17cfee42b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -213,6 +214,7 @@ public class InternalUsers { TransportCloseIndexAction.NAME, TransportCreateIndexAction.TYPE.name(), TransportClusterSearchShardsAction.TYPE.name(), + GetSettingsAction.NAME, TransportUpdateSettingsAction.TYPE.name(), RefreshAction.NAME, ReindexAction.NAME, diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index 03839b7e05d71..bfb539a514e1e 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -23,9 +23,13 @@ import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineTransportAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -38,7 +42,9 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.ingest.common.IngestCommonPlugin; @@ -47,6 +53,18 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.OperationMode; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.StartILMRequest; +import org.elasticsearch.xpack.core.ilm.StopILMRequest; +import org.elasticsearch.xpack.core.ilm.action.GetStatusAction; +import org.elasticsearch.xpack.core.ilm.action.ILMActions; +import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest; +import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.migrate.MigratePlugin; import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; import org.junit.Before; @@ -57,6 +75,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import static java.lang.Boolean.parseBoolean; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; @@ -93,10 +112,22 @@ protected Collection> nodePlugins() { ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class, - IngestCommonPlugin.class + IngestCommonPlugin.class, + IndexLifecycle.class, + LocalStateCompositeXPackPlugin.class ); } + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s") + // This just generates less churn and makes it easier to read the log file if needed + .put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false) + .build(); + } + private static String DATA_STREAM_MAPPING = """ { "dynamic": true, @@ -588,6 +619,135 @@ public void testTsdbStartEndSet() throws Exception { assertHitCount(prepareSearch(destIndex).setSize(0), 1); } + public void testIndexLifecycleSettingNotCopied() throws Exception { + Map phases = Map.of( + "hot", + new Phase( + "hot", + TimeValue.ZERO, + Map.of( + "rollover", + new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null) + ) + ) + ); + + var policyName = "my-policy"; + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + // create data stream with a document and wait for ILM to roll it over + var dataStream = createDataStream(policyName); + createDocument(dataStream); + + assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)))); + assertBusy(() -> { + var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream))); + assertTrue(getIndexResponse.indices().length >= 2); + }); + stopILM(); + + var dataStreams = safeGet( + indicesAdmin().execute( + GetDataStreamAction.INSTANCE, + new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStream }) + ) + ).getDataStreams(); + + assertFalse(dataStreams.isEmpty()); + String writeIndex = dataStreams.get(0).getDataStream().getWriteIndex().getName(); + List indices = dataStreams.get(0).getDataStream().getIndices().stream().map(Index::getName).toList(); + assertTrue(indices.size() >= 2); + + for (var backingIndex : indices) { + if (backingIndex.equals(writeIndex) == false) { + var destIndex = safeGet( + client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(backingIndex)) + ).getDestIndex(); + var settingsResponse = safeGet( + indicesAdmin().getSettings(new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(backingIndex, destIndex)) + ); + assertEquals(policyName, settingsResponse.getSetting(backingIndex, IndexMetadata.LIFECYCLE_NAME)); + assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.LIFECYCLE_NAME)); + } + } + } + + private void stopILM() throws Exception { + assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)))); + assertBusy(() -> { + var statusResponse = safeGet( + client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)) + ); + assertEquals(OperationMode.STOPPED, statusResponse.getMode()); + }); + } + + private String createDataStream(String ilmPolicy) throws Exception { + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault()); + + Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null; + + String mapping = """ + { + "properties": { + "@timestamp": { + "type":"date" + }, + "data":{ + "type":"keyword" + } + } + } + """; + Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null); + + ComposableIndexTemplate template = ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName + "*")) + .template(idxTemplate) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .build(); + + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template) + ) + ); + assertAcked( + client().execute( + CreateDataStreamAction.INSTANCE, + new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName) + ) + ); + return dataStreamName; + } + + private long createDocument(String dataStreamName) throws Exception { + // Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order. + long timeSeed = System.currentTimeMillis(); + long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed); + safeGet( + client().index( + new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE) + .source( + JsonXContent.contentBuilder() + .startObject() + .field("@timestamp", timestamp) + .field("data", randomAlphaOfLength(25)) + .endObject() + ) + ) + ); + safeGet( + indicesAdmin().refresh( + new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden()) + ) + ); + return timestamp; + } + private static void cleanupMetadataBlocks(String index) { var settings = Settings.builder() .putNull(IndexMetadata.SETTING_READ_ONLY) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 31fdcbe074c13..4167d9a7e9773 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -254,6 +254,7 @@ private void createIndex( var settingsOverride = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .putNull(IndexMetadata.LIFECYCLE_NAME) .build(); var request = new CreateIndexFromSourceAction.Request( diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 38ab0275f62c1..acf1416d2f6fb 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -15,6 +15,10 @@ import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.support.CountDownActionListener; @@ -23,8 +27,10 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAction; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -215,9 +221,8 @@ private void maybeProcessNextIndex( SubscribableListener.newForked( l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l) ) - .andThen( - (l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId) - ) + .andThen((l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)) + .andThen((l, newIndex) -> copySettings(index.getName(), newIndex, l, parentTaskId)) .andThen(l -> deleteIndex(index.getName(), parentTaskId, l)) .addListener(ActionListener.wrap(unused -> { reindexDataStreamTask.reindexSucceeded(index.getName()); @@ -234,7 +239,7 @@ private void updateDataStream( String dataStream, String oldIndex, String newIndex, - ActionListener listener, + ActionListener listener, TaskId parentTaskId ) { ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request( @@ -243,7 +248,23 @@ private void updateDataStream( List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex)) ); modifyDataStreamRequest.setParentTask(parentTaskId); - client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener); + client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener.map(ingored -> newIndex)); + } + + private void copySettings(String oldIndex, String newIndex, ActionListener listener, TaskId parentTaskId) { + var getSettingsRequest = new GetSettingsRequest(TimeValue.MAX_VALUE).indices(oldIndex); + getSettingsRequest.setParentTask(parentTaskId); + client.execute(GetSettingsAction.INSTANCE, getSettingsRequest, listener.delegateFailure((delegate, response) -> { + String lifecycleName = response.getSetting(oldIndex, IndexMetadata.LIFECYCLE_NAME); + if (lifecycleName != null) { + var settings = Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, lifecycleName).build(); + var updateSettingsRequest = new UpdateSettingsRequest(settings, newIndex); + updateSettingsRequest.setParentTask(parentTaskId); + client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, delegate); + } else { + delegate.onResponse(null); + } + })); } private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener listener) { From 1a088ef59470e1687e69cca497df863008e583f2 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Thu, 20 Feb 2025 16:21:00 -0600 Subject: [PATCH 2/2] comment --- .../action/ReindexDataStreamIndexTransportAction.java | 1 + .../task/ReindexDataStreamPersistentTaskExecutor.java | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 4167d9a7e9773..2f20290d58048 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -254,6 +254,7 @@ private void createIndex( var settingsOverride = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + // remove lifecycle so that ILM does not start processing before the index is added to data stream .putNull(IndexMetadata.LIFECYCLE_NAME) .build(); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index acf1416d2f6fb..f901e811f70b9 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -251,6 +251,11 @@ private void updateDataStream( client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener.map(ingored -> newIndex)); } + /** + * Copy lifecycle name from the old index to the new index, so that ILM can now process the new index. + * If the new index has a lifecycle name before it is swapped into the data stream, ILM will try, and fail, to process + * the new index. For this reason, lifecycle is not set until after the new index has been added to the data stream. + */ private void copySettings(String oldIndex, String newIndex, ActionListener listener, TaskId parentTaskId) { var getSettingsRequest = new GetSettingsRequest(TimeValue.MAX_VALUE).indices(oldIndex); getSettingsRequest.setParentTask(parentTaskId);