Skip to content

Commit 8cecdaa

Browse files
committed
Remove lifecycle from dest index and add after adding to data stream
1 parent e74ef2d commit 8cecdaa

File tree

4 files changed

+190
-6
lines changed

4 files changed

+190
-6
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1919
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
2020
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
21+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
2122
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
2223
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
2324
import org.elasticsearch.action.bulk.TransportBulkAction;
@@ -213,6 +214,7 @@ public class InternalUsers {
213214
TransportCloseIndexAction.NAME,
214215
TransportCreateIndexAction.TYPE.name(),
215216
TransportClusterSearchShardsAction.TYPE.name(),
217+
GetSettingsAction.NAME,
216218
TransportUpdateSettingsAction.TYPE.name(),
217219
RefreshAction.NAME,
218220
ReindexAction.NAME,

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@
2323
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2424
import org.elasticsearch.action.bulk.BulkRequest;
2525
import org.elasticsearch.action.bulk.BulkResponse;
26+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
27+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
2628
import org.elasticsearch.action.index.IndexRequest;
2729
import org.elasticsearch.action.ingest.PutPipelineRequest;
2830
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
31+
import org.elasticsearch.action.support.IndicesOptions;
32+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2933
import org.elasticsearch.cluster.block.ClusterBlockException;
3034
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
3135
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -38,7 +42,9 @@
3842
import org.elasticsearch.common.time.DateFormatter;
3943
import org.elasticsearch.common.time.FormatNames;
4044
import org.elasticsearch.common.xcontent.support.XContentMapValues;
45+
import org.elasticsearch.core.TimeValue;
4146
import org.elasticsearch.datastreams.DataStreamsPlugin;
47+
import org.elasticsearch.index.Index;
4248
import org.elasticsearch.index.IndexSettings;
4349
import org.elasticsearch.index.mapper.DateFieldMapper;
4450
import org.elasticsearch.ingest.common.IngestCommonPlugin;
@@ -47,6 +53,18 @@
4753
import org.elasticsearch.test.ESIntegTestCase;
4854
import org.elasticsearch.test.transport.MockTransportService;
4955
import org.elasticsearch.xcontent.XContentType;
56+
import org.elasticsearch.xcontent.json.JsonXContent;
57+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
58+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
59+
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
60+
import org.elasticsearch.xpack.core.ilm.OperationMode;
61+
import org.elasticsearch.xpack.core.ilm.Phase;
62+
import org.elasticsearch.xpack.core.ilm.StartILMRequest;
63+
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
64+
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
65+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
66+
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
67+
import org.elasticsearch.xpack.ilm.IndexLifecycle;
5068
import org.elasticsearch.xpack.migrate.MigratePlugin;
5169
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5270
import org.junit.Before;
@@ -57,6 +75,7 @@
5775
import java.util.List;
5876
import java.util.Locale;
5977
import java.util.Map;
78+
import java.util.concurrent.TimeUnit;
6079

6180
import static java.lang.Boolean.parseBoolean;
6281
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
@@ -93,10 +112,22 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
93112
ReindexPlugin.class,
94113
MockTransportService.TestPlugin.class,
95114
DataStreamsPlugin.class,
96-
IngestCommonPlugin.class
115+
IngestCommonPlugin.class,
116+
IndexLifecycle.class,
117+
LocalStateCompositeXPackPlugin.class
97118
);
98119
}
99120

121+
@Override
122+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
123+
return Settings.builder()
124+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
125+
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
126+
// This just generates less churn and makes it easier to read the log file if needed
127+
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
128+
.build();
129+
}
130+
100131
private static String DATA_STREAM_MAPPING = """
101132
{
102133
"dynamic": true,
@@ -588,6 +619,135 @@ public void testTsdbStartEndSet() throws Exception {
588619
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
589620
}
590621

622+
public void testIndexLifecycleSettingNotCopied() throws Exception {
623+
Map<String, Phase> phases = Map.of(
624+
"hot",
625+
new Phase(
626+
"hot",
627+
TimeValue.ZERO,
628+
Map.of(
629+
"rollover",
630+
new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null)
631+
)
632+
)
633+
);
634+
635+
var policyName = "my-policy";
636+
LifecyclePolicy policy = new LifecyclePolicy(policyName, phases);
637+
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
638+
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());
639+
640+
// create data stream with a document and wait for ILM to roll it over
641+
var dataStream = createDataStream(policyName);
642+
createDocument(dataStream);
643+
644+
assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
645+
assertBusy(() -> {
646+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream)));
647+
assertTrue(getIndexResponse.indices().length >= 2);
648+
});
649+
stopILM();
650+
651+
var dataStreams = safeGet(
652+
indicesAdmin().execute(
653+
GetDataStreamAction.INSTANCE,
654+
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStream })
655+
)
656+
).getDataStreams();
657+
658+
assertFalse(dataStreams.isEmpty());
659+
String writeIndex = dataStreams.get(0).getDataStream().getWriteIndex().getName();
660+
List<String> indices = dataStreams.get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
661+
assertTrue(indices.size() >= 2);
662+
663+
for (var backingIndex : indices) {
664+
if (backingIndex.equals(writeIndex) == false) {
665+
var destIndex = safeGet(
666+
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(backingIndex))
667+
).getDestIndex();
668+
var settingsResponse = safeGet(
669+
indicesAdmin().getSettings(new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(backingIndex, destIndex))
670+
);
671+
assertEquals(policyName, settingsResponse.getSetting(backingIndex, IndexMetadata.LIFECYCLE_NAME));
672+
assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.LIFECYCLE_NAME));
673+
}
674+
}
675+
}
676+
677+
private void stopILM() throws Exception {
678+
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
679+
assertBusy(() -> {
680+
var statusResponse = safeGet(
681+
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
682+
);
683+
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
684+
});
685+
}
686+
687+
private String createDataStream(String ilmPolicy) throws Exception {
688+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
689+
690+
Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null;
691+
692+
String mapping = """
693+
{
694+
"properties": {
695+
"@timestamp": {
696+
"type":"date"
697+
},
698+
"data":{
699+
"type":"keyword"
700+
}
701+
}
702+
}
703+
""";
704+
Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null);
705+
706+
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
707+
.indexPatterns(List.of(dataStreamName + "*"))
708+
.template(idxTemplate)
709+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
710+
.build();
711+
712+
assertAcked(
713+
client().execute(
714+
TransportPutComposableIndexTemplateAction.TYPE,
715+
new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
716+
)
717+
);
718+
assertAcked(
719+
client().execute(
720+
CreateDataStreamAction.INSTANCE,
721+
new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName)
722+
)
723+
);
724+
return dataStreamName;
725+
}
726+
727+
private long createDocument(String dataStreamName) throws Exception {
728+
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
729+
long timeSeed = System.currentTimeMillis();
730+
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
731+
safeGet(
732+
client().index(
733+
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
734+
.source(
735+
JsonXContent.contentBuilder()
736+
.startObject()
737+
.field("@timestamp", timestamp)
738+
.field("data", randomAlphaOfLength(25))
739+
.endObject()
740+
)
741+
)
742+
);
743+
safeGet(
744+
indicesAdmin().refresh(
745+
new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden())
746+
)
747+
);
748+
return timestamp;
749+
}
750+
591751
private static void cleanupMetadataBlocks(String index) {
592752
var settings = Settings.builder()
593753
.putNull(IndexMetadata.SETTING_READ_ONLY)

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ private void createIndex(
254254
var settingsOverride = Settings.builder()
255255
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
256256
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
257+
.putNull(IndexMetadata.LIFECYCLE_NAME)
257258
.build();
258259

259260
var request = new CreateIndexFromSourceAction.Request(

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1717
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
18+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
19+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
20+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
21+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1822
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1923
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2024
import org.elasticsearch.action.support.CountDownActionListener;
@@ -23,8 +27,10 @@
2327
import org.elasticsearch.client.internal.Client;
2428
import org.elasticsearch.cluster.metadata.DataStream;
2529
import org.elasticsearch.cluster.metadata.DataStreamAction;
30+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2631
import org.elasticsearch.cluster.service.ClusterService;
2732
import org.elasticsearch.common.settings.Setting;
33+
import org.elasticsearch.common.settings.Settings;
2834
import org.elasticsearch.core.Nullable;
2935
import org.elasticsearch.core.TimeValue;
3036
import org.elasticsearch.index.Index;
@@ -215,9 +221,8 @@ private void maybeProcessNextIndex(
215221
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
216222
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
217223
)
218-
.<AcknowledgedResponse>andThen(
219-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
220-
)
224+
.<String>andThen((l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId))
225+
.<AcknowledgedResponse>andThen((l, newIndex) -> copySettings(index.getName(), newIndex, l, parentTaskId))
221226
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
222227
.addListener(ActionListener.wrap(unused -> {
223228
reindexDataStreamTask.reindexSucceeded(index.getName());
@@ -234,7 +239,7 @@ private void updateDataStream(
234239
String dataStream,
235240
String oldIndex,
236241
String newIndex,
237-
ActionListener<AcknowledgedResponse> listener,
242+
ActionListener<String> listener,
238243
TaskId parentTaskId
239244
) {
240245
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -243,7 +248,23 @@ private void updateDataStream(
243248
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
244249
);
245250
modifyDataStreamRequest.setParentTask(parentTaskId);
246-
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
251+
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener.map(ingored -> newIndex));
252+
}
253+
254+
private void copySettings(String oldIndex, String newIndex, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
255+
var getSettingsRequest = new GetSettingsRequest(TimeValue.MAX_VALUE).indices(oldIndex);
256+
getSettingsRequest.setParentTask(parentTaskId);
257+
client.execute(GetSettingsAction.INSTANCE, getSettingsRequest, listener.delegateFailure((delegate, response) -> {
258+
String lifecycleName = response.getSetting(oldIndex, IndexMetadata.LIFECYCLE_NAME);
259+
if (lifecycleName != null) {
260+
var settings = Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, lifecycleName).build();
261+
var updateSettingsRequest = new UpdateSettingsRequest(settings, newIndex);
262+
updateSettingsRequest.setParentTask(parentTaskId);
263+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, delegate);
264+
} else {
265+
delegate.onResponse(null);
266+
}
267+
}));
247268
}
248269

249270
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {

0 commit comments

Comments
 (0)