Skip to content

Commit 1a76203

Browse files
Add lifecycle setting to destination index after adding to data stream (#123071) (#123159)
When reindexing a data stream, remove the lifecycle name setting when creating the destination index, so that ILM does not process it. Add the setting back after adding the destination index to the data stream, at which point ILM can safely process it. (cherry picked from commit f044d97) # Conflicts: # x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
1 parent b81add7 commit 1a76203

File tree

4 files changed

+194
-6
lines changed

4 files changed

+194
-6
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1919
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
2020
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
21+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
2122
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
2223
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
2324
import org.elasticsearch.action.bulk.TransportBulkAction;
@@ -214,6 +215,7 @@ public class InternalUsers {
214215
TransportCloseIndexAction.NAME,
215216
TransportCreateIndexAction.TYPE.name(),
216217
TransportClusterSearchShardsAction.TYPE.name(),
218+
GetSettingsAction.NAME,
217219
TransportUpdateSettingsAction.TYPE.name(),
218220
RefreshAction.NAME,
219221
ReindexAction.NAME,

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2626
import org.elasticsearch.action.bulk.BulkRequest;
2727
import org.elasticsearch.action.bulk.BulkResponse;
28+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
29+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
2830
import org.elasticsearch.action.index.IndexRequest;
2931
import org.elasticsearch.action.ingest.DeletePipelineRequest;
3032
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
3133
import org.elasticsearch.action.ingest.GetPipelineAction;
3234
import org.elasticsearch.action.ingest.GetPipelineRequest;
3335
import org.elasticsearch.action.ingest.PutPipelineRequest;
3436
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
37+
import org.elasticsearch.action.support.IndicesOptions;
38+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
3539
import org.elasticsearch.cluster.block.ClusterBlockException;
3640
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
3741
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -45,7 +49,9 @@
4549
import org.elasticsearch.common.time.DateFormatter;
4650
import org.elasticsearch.common.time.FormatNames;
4751
import org.elasticsearch.common.xcontent.support.XContentMapValues;
52+
import org.elasticsearch.core.TimeValue;
4853
import org.elasticsearch.datastreams.DataStreamsPlugin;
54+
import org.elasticsearch.index.Index;
4955
import org.elasticsearch.index.IndexSettings;
5056
import org.elasticsearch.index.engine.frozen.FrozenEngine;
5157
import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -58,8 +64,20 @@
5864
import org.elasticsearch.test.ESIntegTestCase;
5965
import org.elasticsearch.test.transport.MockTransportService;
6066
import org.elasticsearch.xcontent.XContentType;
67+
import org.elasticsearch.xcontent.json.JsonXContent;
68+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
6169
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
70+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
71+
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
72+
import org.elasticsearch.xpack.core.ilm.OperationMode;
73+
import org.elasticsearch.xpack.core.ilm.Phase;
74+
import org.elasticsearch.xpack.core.ilm.StartILMRequest;
75+
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
76+
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
77+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
78+
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
6279
import org.elasticsearch.xpack.frozen.action.TransportFreezeIndexAction;
80+
import org.elasticsearch.xpack.ilm.IndexLifecycle;
6381
import org.elasticsearch.xpack.migrate.MigratePlugin;
6482
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
6583
import org.junit.Before;
@@ -72,6 +90,7 @@
7290
import java.util.List;
7391
import java.util.Locale;
7492
import java.util.Map;
93+
import java.util.concurrent.TimeUnit;
7594

7695
import static java.lang.Boolean.parseBoolean;
7796
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
@@ -125,10 +144,22 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
125144
MockTransportService.TestPlugin.class,
126145
DataStreamsPlugin.class,
127146
IngestCommonPlugin.class,
128-
TestFrozenIndicesPlugin.class
147+
TestFrozenIndicesPlugin.class,
148+
IndexLifecycle.class,
149+
LocalStateCompositeXPackPlugin.class
129150
);
130151
}
131152

153+
@Override
154+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
155+
return Settings.builder()
156+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
157+
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
158+
// This just generates less churn and makes it easier to read the log file if needed
159+
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
160+
.build();
161+
}
162+
132163
private static String DATA_STREAM_MAPPING = """
133164
{
134165
"dynamic": true,
@@ -645,6 +676,133 @@ public void testIndexUnfrozen() {
645676
);
646677
}
647678

679+
public void testIndexLifecycleSettingNotCopied() throws Exception {
680+
Map<String, Phase> phases = Map.of(
681+
"hot",
682+
new Phase(
683+
"hot",
684+
TimeValue.ZERO,
685+
Map.of(
686+
"rollover",
687+
new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null)
688+
)
689+
)
690+
);
691+
692+
var policyName = "my-policy";
693+
LifecyclePolicy policy = new LifecyclePolicy(policyName, phases);
694+
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
695+
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());
696+
697+
// create data stream with a document and wait for ILM to roll it over
698+
var dataStream = createDataStream(policyName);
699+
createDocument(dataStream);
700+
701+
assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
702+
assertBusy(() -> {
703+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest().indices(dataStream)));
704+
assertTrue(getIndexResponse.indices().length >= 2);
705+
});
706+
stopILM();
707+
708+
var dataStreams = safeGet(
709+
indicesAdmin().execute(
710+
GetDataStreamAction.INSTANCE,
711+
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStream })
712+
)
713+
).getDataStreams();
714+
715+
assertFalse(dataStreams.isEmpty());
716+
String writeIndex = dataStreams.get(0).getDataStream().getWriteIndex().getName();
717+
List<String> indices = dataStreams.get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
718+
assertTrue(indices.size() >= 2);
719+
720+
for (var backingIndex : indices) {
721+
if (backingIndex.equals(writeIndex) == false) {
722+
var destIndex = safeGet(
723+
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(backingIndex))
724+
).getDestIndex();
725+
var settingsResponse = safeGet(indicesAdmin().getSettings(new GetSettingsRequest().indices(backingIndex, destIndex)));
726+
assertEquals(policyName, settingsResponse.getSetting(backingIndex, IndexMetadata.LIFECYCLE_NAME));
727+
assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.LIFECYCLE_NAME));
728+
}
729+
}
730+
}
731+
732+
private void stopILM() throws Exception {
733+
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
734+
assertBusy(() -> {
735+
var statusResponse = safeGet(
736+
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
737+
);
738+
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
739+
});
740+
}
741+
742+
private String createDataStream(String ilmPolicy) throws Exception {
743+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
744+
745+
Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null;
746+
747+
String mapping = """
748+
{
749+
"properties": {
750+
"@timestamp": {
751+
"type":"date"
752+
},
753+
"data":{
754+
"type":"keyword"
755+
}
756+
}
757+
}
758+
""";
759+
Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null);
760+
761+
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
762+
.indexPatterns(List.of(dataStreamName + "*"))
763+
.template(idxTemplate)
764+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
765+
.build();
766+
767+
assertAcked(
768+
client().execute(
769+
TransportPutComposableIndexTemplateAction.TYPE,
770+
new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
771+
)
772+
);
773+
assertAcked(
774+
client().execute(
775+
CreateDataStreamAction.INSTANCE,
776+
new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName)
777+
)
778+
);
779+
return dataStreamName;
780+
}
781+
782+
private long createDocument(String dataStreamName) throws Exception {
783+
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
784+
long timeSeed = System.currentTimeMillis();
785+
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
786+
safeGet(
787+
client().index(
788+
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
789+
.source(
790+
JsonXContent.contentBuilder()
791+
.startObject()
792+
.field("@timestamp", timestamp)
793+
.field("data", randomAlphaOfLength(25))
794+
.endObject()
795+
)
796+
)
797+
);
798+
safeGet(
799+
indicesAdmin().refresh(
800+
new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden())
801+
)
802+
);
803+
return timestamp;
804+
}
805+
648806
private static void cleanupMetadataBlocks(String index) {
649807
var settings = Settings.builder()
650808
.putNull(IndexMetadata.SETTING_READ_ONLY)

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ private void createIndex(
275275
var settingsOverride = Settings.builder()
276276
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
277277
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
278+
// remove lifecycle so that ILM does not start processing before the index is added to data stream
279+
.putNull(IndexMetadata.LIFECYCLE_NAME)
278280
.build();
279281

280282
var request = new CreateIndexFromSourceAction.Request(

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1717
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
18+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
19+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
20+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
21+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1822
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1923
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2024
import org.elasticsearch.action.support.CountDownActionListener;
@@ -23,8 +27,10 @@
2327
import org.elasticsearch.client.internal.Client;
2428
import org.elasticsearch.cluster.metadata.DataStream;
2529
import org.elasticsearch.cluster.metadata.DataStreamAction;
30+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2631
import org.elasticsearch.cluster.service.ClusterService;
2732
import org.elasticsearch.common.settings.Setting;
33+
import org.elasticsearch.common.settings.Settings;
2834
import org.elasticsearch.core.Nullable;
2935
import org.elasticsearch.core.TimeValue;
3036
import org.elasticsearch.index.Index;
@@ -214,9 +220,8 @@ private void maybeProcessNextIndex(
214220
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
215221
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
216222
)
217-
.<AcknowledgedResponse>andThen(
218-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
219-
)
223+
.<String>andThen((l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId))
224+
.<AcknowledgedResponse>andThen((l, newIndex) -> copySettings(index.getName(), newIndex, l, parentTaskId))
220225
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
221226
.addListener(ActionListener.wrap(unused -> {
222227
reindexDataStreamTask.reindexSucceeded(index.getName());
@@ -233,7 +238,7 @@ private void updateDataStream(
233238
String dataStream,
234239
String oldIndex,
235240
String newIndex,
236-
ActionListener<AcknowledgedResponse> listener,
241+
ActionListener<String> listener,
237242
TaskId parentTaskId
238243
) {
239244
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -242,7 +247,28 @@ private void updateDataStream(
242247
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
243248
);
244249
modifyDataStreamRequest.setParentTask(parentTaskId);
245-
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
250+
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener.map(ingored -> newIndex));
251+
}
252+
253+
/**
254+
* Copy lifecycle name from the old index to the new index, so that ILM can now process the new index.
255+
* If the new index has a lifecycle name before it is swapped into the data stream, ILM will try, and fail, to process
256+
* the new index. For this reason, lifecycle is not set until after the new index has been added to the data stream.
257+
*/
258+
private void copySettings(String oldIndex, String newIndex, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
259+
var getSettingsRequest = new GetSettingsRequest().indices(oldIndex);
260+
getSettingsRequest.setParentTask(parentTaskId);
261+
client.execute(GetSettingsAction.INSTANCE, getSettingsRequest, listener.delegateFailure((delegate, response) -> {
262+
String lifecycleName = response.getSetting(oldIndex, IndexMetadata.LIFECYCLE_NAME);
263+
if (lifecycleName != null) {
264+
var settings = Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, lifecycleName).build();
265+
var updateSettingsRequest = new UpdateSettingsRequest(settings, newIndex);
266+
updateSettingsRequest.setParentTask(parentTaskId);
267+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, delegate);
268+
} else {
269+
delegate.onResponse(null);
270+
}
271+
}));
246272
}
247273

248274
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {

0 commit comments

Comments
 (0)