Skip to content

Commit f044d97

Browse files
Add lifecycle setting to destination index after adding to data stream (#123071)
When reindexing a data stream, remove the lifecycle name setting when creating the destination index, so that ILM does not process it. Add the setting back after adding the destination index to the data stream, at which point ILM can safely process it.
1 parent 5013eee commit f044d97

File tree

4 files changed

+196
-6
lines changed

4 files changed

+196
-6
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1919
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
2020
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
21+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
2122
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
2223
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
2324
import org.elasticsearch.action.bulk.TransportBulkAction;
@@ -213,6 +214,7 @@ public class InternalUsers {
213214
TransportCloseIndexAction.NAME,
214215
TransportCreateIndexAction.TYPE.name(),
215216
TransportClusterSearchShardsAction.TYPE.name(),
217+
GetSettingsAction.NAME,
216218
TransportUpdateSettingsAction.TYPE.name(),
217219
RefreshAction.NAME,
218220
ReindexAction.NAME,

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@
2323
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2424
import org.elasticsearch.action.bulk.BulkRequest;
2525
import org.elasticsearch.action.bulk.BulkResponse;
26+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
27+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
2628
import org.elasticsearch.action.index.IndexRequest;
2729
import org.elasticsearch.action.ingest.PutPipelineRequest;
2830
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
31+
import org.elasticsearch.action.support.IndicesOptions;
32+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2933
import org.elasticsearch.cluster.block.ClusterBlockException;
3034
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
3135
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -38,7 +42,9 @@
3842
import org.elasticsearch.common.time.DateFormatter;
3943
import org.elasticsearch.common.time.FormatNames;
4044
import org.elasticsearch.common.xcontent.support.XContentMapValues;
45+
import org.elasticsearch.core.TimeValue;
4146
import org.elasticsearch.datastreams.DataStreamsPlugin;
47+
import org.elasticsearch.index.Index;
4248
import org.elasticsearch.index.IndexSettings;
4349
import org.elasticsearch.index.mapper.DateFieldMapper;
4450
import org.elasticsearch.ingest.common.IngestCommonPlugin;
@@ -47,6 +53,18 @@
4753
import org.elasticsearch.test.ESIntegTestCase;
4854
import org.elasticsearch.test.transport.MockTransportService;
4955
import org.elasticsearch.xcontent.XContentType;
56+
import org.elasticsearch.xcontent.json.JsonXContent;
57+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
58+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
59+
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
60+
import org.elasticsearch.xpack.core.ilm.OperationMode;
61+
import org.elasticsearch.xpack.core.ilm.Phase;
62+
import org.elasticsearch.xpack.core.ilm.StartILMRequest;
63+
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
64+
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
65+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
66+
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
67+
import org.elasticsearch.xpack.ilm.IndexLifecycle;
5068
import org.elasticsearch.xpack.migrate.MigratePlugin;
5169
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5270
import org.junit.Before;
@@ -57,6 +75,7 @@
5775
import java.util.List;
5876
import java.util.Locale;
5977
import java.util.Map;
78+
import java.util.concurrent.TimeUnit;
6079

6180
import static java.lang.Boolean.parseBoolean;
6281
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
@@ -93,10 +112,22 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
93112
ReindexPlugin.class,
94113
MockTransportService.TestPlugin.class,
95114
DataStreamsPlugin.class,
96-
IngestCommonPlugin.class
115+
IngestCommonPlugin.class,
116+
IndexLifecycle.class,
117+
LocalStateCompositeXPackPlugin.class
97118
);
98119
}
99120

121+
@Override
122+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
123+
return Settings.builder()
124+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
125+
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
126+
// This just generates less churn and makes it easier to read the log file if needed
127+
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
128+
.build();
129+
}
130+
100131
private static String DATA_STREAM_MAPPING = """
101132
{
102133
"dynamic": true,
@@ -588,6 +619,135 @@ public void testTsdbStartEndSet() throws Exception {
588619
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
589620
}
590621

622+
public void testIndexLifecycleSettingNotCopied() throws Exception {
623+
Map<String, Phase> phases = Map.of(
624+
"hot",
625+
new Phase(
626+
"hot",
627+
TimeValue.ZERO,
628+
Map.of(
629+
"rollover",
630+
new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null)
631+
)
632+
)
633+
);
634+
635+
var policyName = "my-policy";
636+
LifecyclePolicy policy = new LifecyclePolicy(policyName, phases);
637+
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
638+
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());
639+
640+
// create data stream with a document and wait for ILM to roll it over
641+
var dataStream = createDataStream(policyName);
642+
createDocument(dataStream);
643+
644+
assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
645+
assertBusy(() -> {
646+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream)));
647+
assertTrue(getIndexResponse.indices().length >= 2);
648+
});
649+
stopILM();
650+
651+
var dataStreams = safeGet(
652+
indicesAdmin().execute(
653+
GetDataStreamAction.INSTANCE,
654+
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStream })
655+
)
656+
).getDataStreams();
657+
658+
assertFalse(dataStreams.isEmpty());
659+
String writeIndex = dataStreams.get(0).getDataStream().getWriteIndex().getName();
660+
List<String> indices = dataStreams.get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
661+
assertTrue(indices.size() >= 2);
662+
663+
for (var backingIndex : indices) {
664+
if (backingIndex.equals(writeIndex) == false) {
665+
var destIndex = safeGet(
666+
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(backingIndex))
667+
).getDestIndex();
668+
var settingsResponse = safeGet(
669+
indicesAdmin().getSettings(new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(backingIndex, destIndex))
670+
);
671+
assertEquals(policyName, settingsResponse.getSetting(backingIndex, IndexMetadata.LIFECYCLE_NAME));
672+
assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.LIFECYCLE_NAME));
673+
}
674+
}
675+
}
676+
677+
private void stopILM() throws Exception {
678+
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
679+
assertBusy(() -> {
680+
var statusResponse = safeGet(
681+
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
682+
);
683+
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
684+
});
685+
}
686+
687+
private String createDataStream(String ilmPolicy) throws Exception {
688+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
689+
690+
Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null;
691+
692+
String mapping = """
693+
{
694+
"properties": {
695+
"@timestamp": {
696+
"type":"date"
697+
},
698+
"data":{
699+
"type":"keyword"
700+
}
701+
}
702+
}
703+
""";
704+
Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null);
705+
706+
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
707+
.indexPatterns(List.of(dataStreamName + "*"))
708+
.template(idxTemplate)
709+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
710+
.build();
711+
712+
assertAcked(
713+
client().execute(
714+
TransportPutComposableIndexTemplateAction.TYPE,
715+
new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
716+
)
717+
);
718+
assertAcked(
719+
client().execute(
720+
CreateDataStreamAction.INSTANCE,
721+
new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName)
722+
)
723+
);
724+
return dataStreamName;
725+
}
726+
727+
private long createDocument(String dataStreamName) throws Exception {
728+
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
729+
long timeSeed = System.currentTimeMillis();
730+
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
731+
safeGet(
732+
client().index(
733+
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
734+
.source(
735+
JsonXContent.contentBuilder()
736+
.startObject()
737+
.field("@timestamp", timestamp)
738+
.field("data", randomAlphaOfLength(25))
739+
.endObject()
740+
)
741+
)
742+
);
743+
safeGet(
744+
indicesAdmin().refresh(
745+
new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden())
746+
)
747+
);
748+
return timestamp;
749+
}
750+
591751
private static void cleanupMetadataBlocks(String index) {
592752
var settings = Settings.builder()
593753
.putNull(IndexMetadata.SETTING_READ_ONLY)

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ private void createIndex(
254254
var settingsOverride = Settings.builder()
255255
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
256256
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
257+
// remove lifecycle so that ILM does not start processing before the index is added to data stream
258+
.putNull(IndexMetadata.LIFECYCLE_NAME)
257259
.build();
258260

259261
var request = new CreateIndexFromSourceAction.Request(

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1717
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
18+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
19+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
20+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
21+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1822
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1923
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2024
import org.elasticsearch.action.support.CountDownActionListener;
@@ -23,8 +27,10 @@
2327
import org.elasticsearch.client.internal.Client;
2428
import org.elasticsearch.cluster.metadata.DataStream;
2529
import org.elasticsearch.cluster.metadata.DataStreamAction;
30+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2631
import org.elasticsearch.cluster.service.ClusterService;
2732
import org.elasticsearch.common.settings.Setting;
33+
import org.elasticsearch.common.settings.Settings;
2834
import org.elasticsearch.core.Nullable;
2935
import org.elasticsearch.core.TimeValue;
3036
import org.elasticsearch.index.Index;
@@ -215,9 +221,8 @@ private void maybeProcessNextIndex(
215221
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
216222
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
217223
)
218-
.<AcknowledgedResponse>andThen(
219-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
220-
)
224+
.<String>andThen((l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId))
225+
.<AcknowledgedResponse>andThen((l, newIndex) -> copySettings(index.getName(), newIndex, l, parentTaskId))
221226
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
222227
.addListener(ActionListener.wrap(unused -> {
223228
reindexDataStreamTask.reindexSucceeded(index.getName());
@@ -234,7 +239,7 @@ private void updateDataStream(
234239
String dataStream,
235240
String oldIndex,
236241
String newIndex,
237-
ActionListener<AcknowledgedResponse> listener,
242+
ActionListener<String> listener,
238243
TaskId parentTaskId
239244
) {
240245
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -243,7 +248,28 @@ private void updateDataStream(
243248
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
244249
);
245250
modifyDataStreamRequest.setParentTask(parentTaskId);
246-
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
251+
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener.map(ingored -> newIndex));
252+
}
253+
254+
/**
255+
* Copy lifecycle name from the old index to the new index, so that ILM can now process the new index.
256+
* If the new index has a lifecycle name before it is swapped into the data stream, ILM will try, and fail, to process
257+
* the new index. For this reason, lifecycle is not set until after the new index has been added to the data stream.
258+
*/
259+
private void copySettings(String oldIndex, String newIndex, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
260+
var getSettingsRequest = new GetSettingsRequest(TimeValue.MAX_VALUE).indices(oldIndex);
261+
getSettingsRequest.setParentTask(parentTaskId);
262+
client.execute(GetSettingsAction.INSTANCE, getSettingsRequest, listener.delegateFailure((delegate, response) -> {
263+
String lifecycleName = response.getSetting(oldIndex, IndexMetadata.LIFECYCLE_NAME);
264+
if (lifecycleName != null) {
265+
var settings = Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, lifecycleName).build();
266+
var updateSettingsRequest = new UpdateSettingsRequest(settings, newIndex);
267+
updateSettingsRequest.setParentTask(parentTaskId);
268+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, delegate);
269+
} else {
270+
delegate.onResponse(null);
271+
}
272+
}));
247273
}
248274

249275
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {

0 commit comments

Comments
 (0)