Skip to content

Commit 198ea74

Browse files
parkertimminselasticsearchmachine
andauthored
[9.0] Add lifecycle setting to destination index after adding to data stream (#123071) (#123151)
* Add lifecycle setting to destination index after adding to data stream (#123071) When reindexing a data stream, remove the lifecycle name setting when creating the destination index, so that ILM does not process it. Add the setting back after adding the destination index to the data stream, at which point ILM can safely process it. * [CI] Auto commit changes from spotless * timeout now in request in 9.0 * another silly compilation error * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent f47b945 commit 198ea74

File tree

4 files changed

+194
-6
lines changed

4 files changed

+194
-6
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1919
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
2020
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
21+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
2122
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
2223
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
2324
import org.elasticsearch.action.bulk.TransportBulkAction;
@@ -213,6 +214,7 @@ public class InternalUsers {
213214
TransportCloseIndexAction.NAME,
214215
TransportCreateIndexAction.TYPE.name(),
215216
TransportClusterSearchShardsAction.TYPE.name(),
217+
GetSettingsAction.NAME,
216218
TransportUpdateSettingsAction.TYPE.name(),
217219
RefreshAction.NAME,
218220
ReindexAction.NAME,

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2222
import org.elasticsearch.action.bulk.BulkRequest;
2323
import org.elasticsearch.action.bulk.BulkResponse;
24+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
25+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
2426
import org.elasticsearch.action.index.IndexRequest;
2527
import org.elasticsearch.action.ingest.PutPipelineRequest;
2628
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
29+
import org.elasticsearch.action.support.IndicesOptions;
30+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2731
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2832
import org.elasticsearch.cluster.metadata.IndexMetadata;
2933
import org.elasticsearch.cluster.metadata.MappingMetadata;
@@ -34,7 +38,9 @@
3438
import org.elasticsearch.common.time.DateFormatter;
3539
import org.elasticsearch.common.time.FormatNames;
3640
import org.elasticsearch.common.xcontent.support.XContentMapValues;
41+
import org.elasticsearch.core.TimeValue;
3742
import org.elasticsearch.datastreams.DataStreamsPlugin;
43+
import org.elasticsearch.index.Index;
3844
import org.elasticsearch.index.IndexSettings;
3945
import org.elasticsearch.index.mapper.DateFieldMapper;
4046
import org.elasticsearch.ingest.common.IngestCommonPlugin;
@@ -43,6 +49,18 @@
4349
import org.elasticsearch.test.ESIntegTestCase;
4450
import org.elasticsearch.test.transport.MockTransportService;
4551
import org.elasticsearch.xcontent.XContentType;
52+
import org.elasticsearch.xcontent.json.JsonXContent;
53+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
54+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
55+
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
56+
import org.elasticsearch.xpack.core.ilm.OperationMode;
57+
import org.elasticsearch.xpack.core.ilm.Phase;
58+
import org.elasticsearch.xpack.core.ilm.StartILMRequest;
59+
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
60+
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
61+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
62+
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
63+
import org.elasticsearch.xpack.ilm.IndexLifecycle;
4664
import org.elasticsearch.xpack.migrate.MigratePlugin;
4765
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
4866
import org.junit.Before;
@@ -53,6 +71,7 @@
5371
import java.util.List;
5472
import java.util.Locale;
5573
import java.util.Map;
74+
import java.util.concurrent.TimeUnit;
5675

5776
import static java.lang.Boolean.parseBoolean;
5877
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
@@ -88,10 +107,22 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
88107
ReindexPlugin.class,
89108
MockTransportService.TestPlugin.class,
90109
DataStreamsPlugin.class,
91-
IngestCommonPlugin.class
110+
IngestCommonPlugin.class,
111+
IndexLifecycle.class,
112+
LocalStateCompositeXPackPlugin.class
92113
);
93114
}
94115

116+
@Override
117+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
118+
return Settings.builder()
119+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
120+
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
121+
// This just generates less churn and makes it easier to read the log file if needed
122+
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
123+
.build();
124+
}
125+
95126
private static String DATA_STREAM_MAPPING = """
96127
{
97128
"dynamic": true,
@@ -532,6 +563,133 @@ public void testTsdbStartEndSet() throws Exception {
532563
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
533564
}
534565

566+
public void testIndexLifecycleSettingNotCopied() throws Exception {
567+
Map<String, Phase> phases = Map.of(
568+
"hot",
569+
new Phase(
570+
"hot",
571+
TimeValue.ZERO,
572+
Map.of(
573+
"rollover",
574+
new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null)
575+
)
576+
)
577+
);
578+
579+
var policyName = "my-policy";
580+
LifecyclePolicy policy = new LifecyclePolicy(policyName, phases);
581+
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
582+
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());
583+
584+
// create data stream with a document and wait for ILM to roll it over
585+
var dataStream = createDataStream(policyName);
586+
createDocument(dataStream);
587+
588+
assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
589+
assertBusy(() -> {
590+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream)));
591+
assertTrue(getIndexResponse.indices().length >= 2);
592+
});
593+
stopILM();
594+
595+
var dataStreams = safeGet(
596+
indicesAdmin().execute(
597+
GetDataStreamAction.INSTANCE,
598+
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStream })
599+
)
600+
).getDataStreams();
601+
602+
assertFalse(dataStreams.isEmpty());
603+
String writeIndex = dataStreams.get(0).getDataStream().getWriteIndex().getName();
604+
List<String> indices = dataStreams.get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
605+
assertTrue(indices.size() >= 2);
606+
607+
for (var backingIndex : indices) {
608+
if (backingIndex.equals(writeIndex) == false) {
609+
var destIndex = safeGet(
610+
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(backingIndex))
611+
).getDestIndex();
612+
var settingsResponse = safeGet(indicesAdmin().getSettings(new GetSettingsRequest().indices(backingIndex, destIndex)));
613+
assertEquals(policyName, settingsResponse.getSetting(backingIndex, IndexMetadata.LIFECYCLE_NAME));
614+
assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.LIFECYCLE_NAME));
615+
}
616+
}
617+
}
618+
619+
private void stopILM() throws Exception {
620+
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
621+
assertBusy(() -> {
622+
var statusResponse = safeGet(
623+
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
624+
);
625+
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
626+
});
627+
}
628+
629+
private String createDataStream(String ilmPolicy) throws Exception {
630+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
631+
632+
Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null;
633+
634+
String mapping = """
635+
{
636+
"properties": {
637+
"@timestamp": {
638+
"type":"date"
639+
},
640+
"data":{
641+
"type":"keyword"
642+
}
643+
}
644+
}
645+
""";
646+
Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null);
647+
648+
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
649+
.indexPatterns(List.of(dataStreamName + "*"))
650+
.template(idxTemplate)
651+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
652+
.build();
653+
654+
assertAcked(
655+
client().execute(
656+
TransportPutComposableIndexTemplateAction.TYPE,
657+
new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
658+
)
659+
);
660+
assertAcked(
661+
client().execute(
662+
CreateDataStreamAction.INSTANCE,
663+
new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName)
664+
)
665+
);
666+
return dataStreamName;
667+
}
668+
669+
private long createDocument(String dataStreamName) throws Exception {
670+
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
671+
long timeSeed = System.currentTimeMillis();
672+
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
673+
safeGet(
674+
client().index(
675+
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
676+
.source(
677+
JsonXContent.contentBuilder()
678+
.startObject()
679+
.field("@timestamp", timestamp)
680+
.field("data", randomAlphaOfLength(25))
681+
.endObject()
682+
)
683+
)
684+
);
685+
safeGet(
686+
indicesAdmin().refresh(
687+
new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden())
688+
)
689+
);
690+
return timestamp;
691+
}
692+
535693
private static void cleanupMetadataBlocks(String index) {
536694
var settings = Settings.builder()
537695
.putNull(IndexMetadata.SETTING_READ_ONLY)

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ private void createIndex(
249249
var settingsOverride = Settings.builder()
250250
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
251251
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
252+
// remove lifecycle so that ILM does not start processing before the index is added to data stream
253+
.putNull(IndexMetadata.LIFECYCLE_NAME)
252254
.build();
253255

254256
var request = new CreateIndexFromSourceAction.Request(

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1717
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
18+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
19+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
20+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
21+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1822
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1923
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2024
import org.elasticsearch.action.support.CountDownActionListener;
@@ -23,8 +27,10 @@
2327
import org.elasticsearch.client.internal.Client;
2428
import org.elasticsearch.cluster.metadata.DataStream;
2529
import org.elasticsearch.cluster.metadata.DataStreamAction;
30+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2631
import org.elasticsearch.cluster.service.ClusterService;
2732
import org.elasticsearch.common.settings.Setting;
33+
import org.elasticsearch.common.settings.Settings;
2834
import org.elasticsearch.core.Nullable;
2935
import org.elasticsearch.core.TimeValue;
3036
import org.elasticsearch.index.Index;
@@ -215,9 +221,8 @@ private void maybeProcessNextIndex(
215221
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
216222
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
217223
)
218-
.<AcknowledgedResponse>andThen(
219-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
220-
)
224+
.<String>andThen((l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId))
225+
.<AcknowledgedResponse>andThen((l, newIndex) -> copySettings(index.getName(), newIndex, l, parentTaskId))
221226
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
222227
.addListener(ActionListener.wrap(unused -> {
223228
reindexDataStreamTask.reindexSucceeded(index.getName());
@@ -234,7 +239,7 @@ private void updateDataStream(
234239
String dataStream,
235240
String oldIndex,
236241
String newIndex,
237-
ActionListener<AcknowledgedResponse> listener,
242+
ActionListener<String> listener,
238243
TaskId parentTaskId
239244
) {
240245
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -243,7 +248,28 @@ private void updateDataStream(
243248
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
244249
);
245250
modifyDataStreamRequest.setParentTask(parentTaskId);
246-
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
251+
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener.map(ingored -> newIndex));
252+
}
253+
254+
/**
255+
* Copy lifecycle name from the old index to the new index, so that ILM can now process the new index.
256+
* If the new index has a lifecycle name before it is swapped into the data stream, ILM will try, and fail, to process
257+
* the new index. For this reason, lifecycle is not set until after the new index has been added to the data stream.
258+
*/
259+
private void copySettings(String oldIndex, String newIndex, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
260+
var getSettingsRequest = new GetSettingsRequest().indices(oldIndex);
261+
getSettingsRequest.setParentTask(parentTaskId);
262+
client.execute(GetSettingsAction.INSTANCE, getSettingsRequest, listener.delegateFailure((delegate, response) -> {
263+
String lifecycleName = response.getSetting(oldIndex, IndexMetadata.LIFECYCLE_NAME);
264+
if (lifecycleName != null) {
265+
var settings = Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, lifecycleName).build();
266+
var updateSettingsRequest = new UpdateSettingsRequest(settings, newIndex);
267+
updateSettingsRequest.setParentTask(parentTaskId);
268+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, delegate);
269+
} else {
270+
delegate.onResponse(null);
271+
}
272+
}));
247273
}
248274

249275
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {

0 commit comments

Comments
 (0)