Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2077,6 +2077,12 @@ public Builder putRolloverInfo(RolloverInfo rolloverInfo) {
return this;
}

public Builder putRolloverInfos(Map<String, RolloverInfo> rolloverInfos) {
this.rolloverInfos.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearing the existing infos is a bit weirds, but seems better than making rolloverInfos non-final

this.rolloverInfos.putAllFromMap(rolloverInfos);
return this;
}

public long version() {
return this.version;
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/migrate/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(xpackModule('ilm'))
testImplementation project(':modules:data-streams')
testImplementation project(path: ':modules:reindex')
testImplementation project(path: ':modules:ingest-common')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.StartILMRequest;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.junit.After;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

public class CopyIndexMetadataTransportActionIT extends ESIntegTestCase {
@After
public void cleanup() {
updateClusterSettings(Settings.builder().putNull("*"));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(
LocalStateCompositeXPackPlugin.class,
MigratePlugin.class,
DataStreamsPlugin.class,
IngestCommonPlugin.class,
IndexLifecycle.class
);
}

public void testCreationDate() throws Exception {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get();

// so creation date is different
safeSleep(2);

var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(destIndex)).get();

// verify source and dest date are actually different before copying
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(sourceIndex, destIndex)).actionGet();
var indexToSettings = settingsResponse.getIndexToSettings();
var sourceDate = indexToSettings.get(sourceIndex).getAsLong(IndexMetadata.SETTING_CREATION_DATE, 0L);
{
var destDate = indexToSettings.get(destIndex).getAsLong(IndexMetadata.SETTING_CREATION_DATE, 0L);
assertTrue(sourceDate > 0);
assertTrue(destDate > 0);
assertNotEquals(sourceDate, destDate);
}

// copy over the metadata
copyMetadata(sourceIndex, destIndex);

var destDate = indicesAdmin().getSettings(new GetSettingsRequest().indices(sourceIndex, destIndex))
.actionGet()
.getIndexToSettings()
.get(destIndex)
.getAsLong(IndexMetadata.SETTING_CREATION_DATE, 0L);
assertEquals(sourceDate, destDate);
}

public void testILMState() throws Exception {

updateClusterSettings(Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"));

Map<String, Phase> phases = Map.of(
"hot",
new Phase(
"hot",
TimeValue.ZERO,
Map.of(
"rollover",
new org.elasticsearch.xpack.core.ilm.RolloverAction(null, null, null, 1L, null, null, null, null, null, null)
)
)
);

var policyName = "my-policy";
LifecyclePolicy policy = new LifecyclePolicy(policyName, phases);
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());

// create data stream with a document and wait for ILM to roll it over
var dataStream = createDataStream(policyName);
createDocument(dataStream);
assertAcked(safeGet(client().execute(ILMActions.START, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
assertBusy(() -> {
var getIndexResponse = indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream)).get();
assertTrue(getIndexResponse.indices().length > 1);
});
// stop ILM so source does not change after copying metadata
assertAcked(safeGet(client().execute(ILMActions.STOP, new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));

var getIndexResponse = indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream)).get();
for (var backingIndex : getIndexResponse.indices()) {
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(destIndex)).get();

var metadataBefore = getClusterMetadata(backingIndex, destIndex);
IndexMetadata source = metadataBefore.index(backingIndex);
IndexMetadata destBefore = metadataBefore.index(destIndex);

// sanity check
assertNotEquals(
source.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY),
destBefore.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)
);

// copy over the metadata
copyMetadata(backingIndex, destIndex);

IndexMetadata destAfter = getClusterMetadata(destIndex).index(destIndex);
assertEquals(
source.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY),
destAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)
);

}
}

public void testRolloverInfos() throws Exception {
var dataStream = createDataStream(null);

// rollover a few times
createDocument(dataStream);
rollover(dataStream);
createDocument(dataStream);
rollover(dataStream);
createDocument(dataStream);
var writeIndex = rollover(dataStream);

var getIndexResponse = indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStream)).get();
for (var backingIndex : getIndexResponse.indices()) {

var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(destIndex)).get();

var metadataBefore = getClusterMetadata(backingIndex, destIndex);
IndexMetadata source = metadataBefore.index(backingIndex);
IndexMetadata destBefore = metadataBefore.index(destIndex);

// sanity check not equal before the copy
if (backingIndex.equals(writeIndex)) {
assertTrue(source.getRolloverInfos().isEmpty());
assertTrue(destBefore.getRolloverInfos().isEmpty());
} else {
assertNotEquals(source.getRolloverInfos(), destBefore.getRolloverInfos());
}

// copy over the metadata
copyMetadata(backingIndex, destIndex);

// now rollover info should be equal
IndexMetadata destAfter = getClusterMetadata(destIndex).index(destIndex);
assertEquals(source.getRolloverInfos(), destAfter.getRolloverInfos());
}
}

private String createDataStream(String ilmPolicy) throws Exception {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());

Settings settings = ilmPolicy != null ? Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, ilmPolicy).build() : null;

String mapping = """
{
"properties": {
"@timestamp": {
"type":"date"
},
"data":{
"type":"keyword"
}
}
}
""";
Template idxTemplate = new Template(settings, new CompressedXContent(mapping), null);

ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(idxTemplate)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
.build();

assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
)
);
assertAcked(
client().execute(
CreateDataStreamAction.INSTANCE,
new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName)
)
);
return dataStreamName;
}

private long createDocument(String dataStreamName) throws Exception {
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
long timeSeed = System.currentTimeMillis();
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
client().index(
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
.source(
JsonXContent.contentBuilder()
.startObject()
.field("@timestamp", timestamp)
.field("data", randomAlphaOfLength(25))
.endObject()
)
).get();
indicesAdmin().refresh(new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden()))
.get();
return timestamp;
}

private void copyMetadata(String sourceIndex, String destIndex) {
assertAcked(
client().execute(
CopyIndexMetadataAction.INSTANCE,
new CopyIndexMetadataAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, destIndex)
)
);
}

private String rollover(String dataStream) {
var rolloverResponse = safeGet(indicesAdmin().rolloverIndex(new RolloverRequest(dataStream, null)));
assertTrue(rolloverResponse.isAcknowledged());
return rolloverResponse.getNewIndex();
}

private Metadata getClusterMetadata(String... indices) {
return safeGet(clusterAdmin().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).indices(indices))).getState().metadata();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamEnrichedStatus;
Expand All @@ -47,7 +48,7 @@ public class ReindexDataStreamTransportActionIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, MigratePlugin.class);
return List.of(LocalStateCompositeXPackPlugin.class, DataStreamsPlugin.class, MigratePlugin.class);
}

public void testNonExistentDataStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
import org.junit.Before;
Expand Down Expand Up @@ -89,6 +90,7 @@ private void setup() throws Exception {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(
LocalStateCompositeXPackPlugin.class,
MigratePlugin.class,
ReindexPlugin.class,
MockTransportService.TestPlugin.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction;
import org.elasticsearch.xpack.migrate.action.CopyIndexMetadataAction;
import org.elasticsearch.xpack.migrate.action.CopyIndexMetadataTransportAction;
import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceAction;
import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceTransportAction;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
Expand Down Expand Up @@ -106,6 +108,7 @@ public List<RestHandler> getRestHandlers(
actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class));
actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, ReindexDataStreamIndexTransportAction.class));
actions.add(new ActionHandler<>(CreateIndexFromSourceAction.INSTANCE, CreateIndexFromSourceTransportAction.class));
actions.add(new ActionHandler<>(CopyIndexMetadataAction.INSTANCE, CopyIndexMetadataTransportAction.class));
return actions;
}

Expand Down
Loading