Skip to content

Commit c0ce37d

Browse files
[8.x] Add action to copy index metadata when reindexing data stream indices (elastic#122535) (elastic#122559)
* Add action to copy index metadata when reindexing data stream indices (elastic#122535) When reindexing data stream indices, parts of the index metadata needs to be copied from the source index to destination index, so that ILM and data stream lifecycle function properly. This adds a new CopyLifecycleIndexMetadataTransportAction which copies the following metadata from a source index to a destination index: - creation date setting - rollover info - ILM custom metadata (cherry picked from commit aba25c6) # Conflicts: # x-pack/plugin/migrate/build.gradle * remove timeouts which were not present in 8.x
1 parent ac3eae6 commit c0ce37d

File tree

10 files changed

+590
-14
lines changed

10 files changed

+590
-14
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2122,6 +2122,12 @@ public Builder putRolloverInfo(RolloverInfo rolloverInfo) {
21222122
return this;
21232123
}
21242124

2125+
public Builder putRolloverInfos(Map<String, RolloverInfo> rolloverInfos) {
2126+
this.rolloverInfos.clear();
2127+
this.rolloverInfos.putAllFromMap(rolloverInfos);
2128+
return this;
2129+
}
2130+
21252131
public long version() {
21262132
return this.version;
21272133
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ public class InternalUsers {
208208
TransportDeleteIndexAction.TYPE.name(),
209209
"indices:admin/data_stream/index/reindex",
210210
"indices:admin/index/create_from_source",
211+
"indices:admin/index/copy_lifecycle_index_metadata",
211212
TransportAddIndexBlockAction.TYPE.name(),
212213
OpenIndexAction.NAME,
213214
TransportCloseIndexAction.NAME,

x-pack/plugin/migrate/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dependencies {
1818
testImplementation(testArtifact(project(xpackModule('core'))))
1919
testImplementation project(xpackModule('ccr'))
2020
testImplementation project(xpackModule('frozen-indices'))
21+
testImplementation project(xpackModule('ilm'))
2122
testImplementation project(':modules:data-streams')
2223
testImplementation project(path: ':modules:reindex')
2324
testImplementation project(path: ':modules:ingest-common')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.migrate.action;
9+
10+
import org.elasticsearch.action.DocWriteRequest;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
13+
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
14+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
15+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
16+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
17+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
18+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
19+
import org.elasticsearch.action.index.IndexRequest;
20+
import org.elasticsearch.action.support.IndicesOptions;
21+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
22+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
23+
import org.elasticsearch.cluster.metadata.IndexMetadata;
24+
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
25+
import org.elasticsearch.cluster.metadata.Metadata;
26+
import org.elasticsearch.cluster.metadata.Template;
27+
import org.elasticsearch.common.compress.CompressedXContent;
28+
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.core.TimeValue;
30+
import org.elasticsearch.datastreams.DataStreamsPlugin;
31+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
32+
import org.elasticsearch.plugins.Plugin;
33+
import org.elasticsearch.test.ESIntegTestCase;
34+
import org.elasticsearch.xcontent.json.JsonXContent;
35+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
36+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
37+
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
38+
import org.elasticsearch.xpack.core.ilm.OperationMode;
39+
import org.elasticsearch.xpack.core.ilm.Phase;
40+
import org.elasticsearch.xpack.core.ilm.StartILMRequest;
41+
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
42+
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
43+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
44+
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
45+
import org.elasticsearch.xpack.ilm.IndexLifecycle;
46+
import org.elasticsearch.xpack.migrate.MigratePlugin;
47+
48+
import java.util.Collection;
49+
import java.util.List;
50+
import java.util.Locale;
51+
import java.util.Map;
52+
import java.util.concurrent.TimeUnit;
53+
54+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
55+
56+
public class CopyLifecycleIndexMetadataTransportActionIT extends ESIntegTestCase {
57+
58+
@Override
59+
protected Collection<Class<? extends Plugin>> nodePlugins() {
60+
return List.of(
61+
LocalStateCompositeXPackPlugin.class,
62+
MigratePlugin.class,
63+
DataStreamsPlugin.class,
64+
IngestCommonPlugin.class,
65+
IndexLifecycle.class
66+
);
67+
}
68+
69+
@Override
70+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
71+
return Settings.builder()
72+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
73+
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
74+
// This just generates less churn and makes it easier to read the log file if needed
75+
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
76+
.build();
77+
}
78+
79+
public void testCreationDate() {
80+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
81+
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
82+
83+
// so creation date is different
84+
safeSleep(2);
85+
86+
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
87+
safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex)));
88+
89+
// verify source and dest date are actually different before copying
90+
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(sourceIndex, destIndex)).actionGet();
91+
var indexToSettings = settingsResponse.getIndexToSettings();
92+
var sourceDate = indexToSettings.get(sourceIndex).getAsLong(IndexMetadata.SETTING_CREATION_DATE, 0L);
93+
{
94+
var destDate = indexToSettings.get(destIndex).getAsLong(IndexMetadata.SETTING_CREATION_DATE, 0L);
95+
assertTrue(sourceDate > 0);
96+
assertTrue(destDate > 0);
97+
assertNotEquals(sourceDate, destDate);
98+
}
99+
100+
// copy over the metadata
101+
copyMetadata(sourceIndex, destIndex);
102+
103+
var destDate = indicesAdmin().getSettings(new GetSettingsRequest().indices(sourceIndex, destIndex))
104+
.actionGet()
105+
.getIndexToSettings()
106+
.get(destIndex)
107+
.getAsLong(IndexMetadata.SETTING_CREATION_DATE, 0L);
108+
assertEquals(sourceDate, destDate);
109+
}
110+
111+
public void testILMState() throws Exception {
112+
113+
Map<String, Phase> phases = Map.of(
114+
"hot",
115+
new Phase(
116+
"hot",
117+
TimeValue.ZERO,
118+
Map.of(
119+
"rollover",
120+
new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null)
121+
)
122+
)
123+
);
124+
125+
var policyName = "my-policy";
126+
LifecyclePolicy policy = new LifecyclePolicy(policyName, phases);
127+
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
128+
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());
129+
130+
// create data stream with a document and wait for ILM to roll it over
131+
var dataStream = createDataStream(policyName);
132+
createDocument(dataStream);
133+
assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
134+
assertBusy(() -> {
135+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest().indices(dataStream)));
136+
assertTrue(getIndexResponse.indices().length > 1);
137+
});
138+
// stop ILM so source does not change after copying metadata
139+
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
140+
assertBusy(() -> {
141+
var statusResponse = safeGet(
142+
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
143+
);
144+
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
145+
});
146+
147+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest().indices(dataStream)));
148+
for (var backingIndex : getIndexResponse.indices()) {
149+
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
150+
safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex)));
151+
152+
IndexMetadata destBefore = getClusterMetadata(destIndex).index(destIndex);
153+
assertNull(destBefore.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY));
154+
155+
// copy over the metadata
156+
copyMetadata(backingIndex, destIndex);
157+
158+
var metadataAfter = getClusterMetadata(backingIndex, destIndex);
159+
IndexMetadata sourceAfter = metadataAfter.index(backingIndex);
160+
IndexMetadata destAfter = metadataAfter.index(destIndex);
161+
assertNotNull(destAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY));
162+
assertEquals(
163+
sourceAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY),
164+
destAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)
165+
);
166+
167+
}
168+
}
169+
170+
public void testRolloverInfos() throws Exception {
171+
var dataStream = createDataStream(null);
172+
173+
// rollover a few times
174+
createDocument(dataStream);
175+
rollover(dataStream);
176+
createDocument(dataStream);
177+
rollover(dataStream);
178+
createDocument(dataStream);
179+
var writeIndex = rollover(dataStream);
180+
181+
var getIndexResponse = safeGet(indicesAdmin().getIndex(new GetIndexRequest().indices(dataStream)));
182+
for (var backingIndex : getIndexResponse.indices()) {
183+
184+
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
185+
safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex)));
186+
187+
var metadataBefore = getClusterMetadata(backingIndex, destIndex);
188+
IndexMetadata source = metadataBefore.index(backingIndex);
189+
IndexMetadata destBefore = metadataBefore.index(destIndex);
190+
191+
// sanity check not equal before the copy
192+
if (backingIndex.equals(writeIndex)) {
193+
assertTrue(source.getRolloverInfos().isEmpty());
194+
assertTrue(destBefore.getRolloverInfos().isEmpty());
195+
} else {
196+
assertNotEquals(source.getRolloverInfos(), destBefore.getRolloverInfos());
197+
}
198+
199+
// copy over the metadata
200+
copyMetadata(backingIndex, destIndex);
201+
202+
// now rollover info should be equal
203+
IndexMetadata destAfter = getClusterMetadata(destIndex).index(destIndex);
204+
assertEquals(source.getRolloverInfos(), destAfter.getRolloverInfos());
205+
}
206+
}
207+
208+
private String createDataStream(String ilmPolicy) throws Exception {
209+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
210+
211+
Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null;
212+
213+
String mapping = """
214+
{
215+
"properties": {
216+
"@timestamp": {
217+
"type":"date"
218+
},
219+
"data":{
220+
"type":"keyword"
221+
}
222+
}
223+
}
224+
""";
225+
Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null);
226+
227+
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
228+
.indexPatterns(List.of(dataStreamName + "*"))
229+
.template(idxTemplate)
230+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
231+
.build();
232+
233+
assertAcked(
234+
client().execute(
235+
TransportPutComposableIndexTemplateAction.TYPE,
236+
new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
237+
)
238+
);
239+
assertAcked(
240+
client().execute(
241+
CreateDataStreamAction.INSTANCE,
242+
new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName)
243+
)
244+
);
245+
return dataStreamName;
246+
}
247+
248+
private long createDocument(String dataStreamName) throws Exception {
249+
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
250+
long timeSeed = System.currentTimeMillis();
251+
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
252+
safeGet(
253+
client().index(
254+
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
255+
.source(
256+
JsonXContent.contentBuilder()
257+
.startObject()
258+
.field("@timestamp", timestamp)
259+
.field("data", randomAlphaOfLength(25))
260+
.endObject()
261+
)
262+
)
263+
);
264+
safeGet(
265+
indicesAdmin().refresh(
266+
new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden())
267+
)
268+
);
269+
return timestamp;
270+
}
271+
272+
private void copyMetadata(String sourceIndex, String destIndex) {
273+
assertAcked(
274+
client().execute(
275+
CopyLifecycleIndexMetadataAction.INSTANCE,
276+
new CopyLifecycleIndexMetadataAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, destIndex)
277+
)
278+
);
279+
}
280+
281+
private String rollover(String dataStream) {
282+
var rolloverResponse = safeGet(indicesAdmin().rolloverIndex(new RolloverRequest(dataStream, null)));
283+
assertTrue(rolloverResponse.isAcknowledged());
284+
return rolloverResponse.getNewIndex();
285+
}
286+
287+
private Metadata getClusterMetadata(String... indices) {
288+
return safeGet(clusterAdmin().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).indices(indices))).getState().metadata();
289+
}
290+
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.elasticsearch.xcontent.ParseField;
3737
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
3838
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction;
39+
import org.elasticsearch.xpack.migrate.action.CopyLifecycleIndexMetadataAction;
40+
import org.elasticsearch.xpack.migrate.action.CopyLifecycleIndexMetadataTransportAction;
3941
import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceAction;
4042
import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceTransportAction;
4143
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
@@ -106,6 +108,7 @@ public List<RestHandler> getRestHandlers(
106108
actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class));
107109
actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, ReindexDataStreamIndexTransportAction.class));
108110
actions.add(new ActionHandler<>(CreateIndexFromSourceAction.INSTANCE, CreateIndexFromSourceTransportAction.class));
111+
actions.add(new ActionHandler<>(CopyLifecycleIndexMetadataAction.INSTANCE, CopyLifecycleIndexMetadataTransportAction.class));
109112
return actions;
110113
}
111114

0 commit comments

Comments
 (0)