diff --git a/docs/changelog/124651.yaml b/docs/changelog/124651.yaml new file mode 100644 index 0000000000000..5c5e064fcd5e8 --- /dev/null +++ b/docs/changelog/124651.yaml @@ -0,0 +1,5 @@ +pr: 124651 +summary: "Fix system data streams to be restorable from a snapshot" +area: Infra/Core +type: bug +issues: [89261] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java index 262bc0b07960a..b5c031dfd75c9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java @@ -20,21 +20,16 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.index.Index; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.injection.guice.Inject; -import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -44,6 +39,7 @@ import java.util.List; import java.util.Set; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.elasticsearch.action.datastreams.DataStreamsActionUtil.getDataStreamNames; @@ -134,7 +130,6 @@ static ClusterState removeDataStream( for (String dataStreamName : dataStreams) { systemDataStreamAccessValidator.accept(dataStreamName); } - Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(currentState, dataStreams); if (dataStreams.isEmpty()) { if (request.isWildcardExpressionsOriginallySpecified()) { @@ -144,33 +139,11 @@ static ClusterState removeDataStream( } } - if (snapshottingDataStreams.isEmpty() == false) { - throw new SnapshotInProgressException( - "Cannot delete data streams that are being snapshotted: " - + snapshottingDataStreams - + ". Try again after snapshot finishes or cancel the currently running snapshot." - ); - } - - Set backingIndicesToRemove = new HashSet<>(); - for (String dataStreamName : dataStreams) { - DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName); - assert dataStream != null; - backingIndicesToRemove.addAll(dataStream.getIndices()); - backingIndicesToRemove.addAll(dataStream.getFailureIndices()); - } - - // first delete the data streams and then the indices: - // (this to avoid data stream validation from failing when deleting an index that is part of a data stream - // without updating the data stream) - // TODO: change order when delete index api also updates the data stream the index to be removed is member of - Metadata.Builder metadata = Metadata.builder(currentState.metadata()); - for (String ds : dataStreams) { - LOGGER.info("removing data stream [{}]", ds); - metadata.removeDataStream(ds); - } - currentState = ClusterState.builder(currentState).metadata(metadata).build(); - return MetadataDeleteIndexService.deleteIndices(currentState, backingIndicesToRemove, settings); + return MetadataDataStreamsService.deleteDataStreams( + currentState, + dataStreams.stream().map(currentState.metadata().dataStreams()::get).collect(Collectors.toSet()), + settings + ); } @Override diff --git a/server/build.gradle b/server/build.gradle index 8bed775e4efbe..1afb32a973e02 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -80,6 +80,7 @@ dependencies { } internalClusterTestImplementation(project(':modules:reindex')) internalClusterTestImplementation(project(':modules:mapper-extras')) + internalClusterTestImplementation(project(':modules:data-streams')) } spotless { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/AutoCreateSystemIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/AutoCreateSystemIndexIT.java index e46a0e2ab65ee..5def913d7f127 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/AutoCreateSystemIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/AutoCreateSystemIndexIT.java @@ -28,7 +28,7 @@ import org.elasticsearch.indices.TestSystemIndexPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; -import org.elasticsearch.snapshots.SystemIndicesSnapshotIT; +import org.elasticsearch.snapshots.SystemResourceSnapshotIT; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentType; import org.junit.After; @@ -300,7 +300,7 @@ public Collection getSystemIndexDescriptors(Settings sett @Override public String getFeatureName() { - return SystemIndicesSnapshotIT.SystemIndexTestPlugin.class.getSimpleName(); + return SystemResourceSnapshotIT.SystemIndexTestPlugin.class.getSimpleName(); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemIndicesSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java similarity index 62% rename from server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemIndicesSnapshotIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java index c6e02300ccef6..73dc98cbed2b3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemIndicesSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java @@ -10,12 +10,21 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.indices.AssociatedIndexDescriptor; +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.plugins.Plugin; @@ -24,10 +33,12 @@ import org.elasticsearch.test.ESIntegTestCase; import org.junit.Before; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -41,9 +52,10 @@ import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.oneOf; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase { +public class SystemResourceSnapshotIT extends AbstractSnapshotIntegTestCase { public static final String REPO_NAME = "test-repo"; @@ -55,6 +67,11 @@ protected Collection> nodePlugins() { plugins.add(SystemIndexTestPlugin.class); plugins.add(AnotherSystemIndexTestPlugin.class); plugins.add(AssociatedIndicesTestPlugin.class); + plugins.add(DataStreamsPlugin.class); + plugins.add(AnotherSystemDataStreamTestPlugin.class); + plugins.add(SystemDataStreamTestPlugin.class); + plugins.add(SystemDataStreamManyShardsTestPlugin.class); + plugins.add(AssociatedIndicesSystemDSTestPlugin.class); return plugins; } @@ -70,16 +87,18 @@ public void setup() { */ public void testRestoreSystemIndicesAsGlobalState() { createRepository(REPO_NAME, "fs"); - // put a document in a system index + // put a document in a system index and data stream indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); // run a snapshot including global state createFullSnapshot(REPO_NAME, "test-snap"); - // add another document + // add another document to each system resource indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); @@ -91,8 +110,9 @@ public void testRestoreSystemIndicesAsGlobalState() { ).setWaitForCompletion(true).setRestoreGlobalState(true).get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - // verify only the original document is restored + // verify only the original documents are restored assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); } /** @@ -101,6 +121,7 @@ public void testRestoreSystemIndicesAsGlobalState() { public void testSnapshotWithoutGlobalState() { createRepository(REPO_NAME, "fs"); indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "system index doc"); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); indexDoc("not-a-system-index", "1", "purpose", "non system index doc"); // run a snapshot without global state @@ -122,6 +143,7 @@ public void testSnapshotWithoutGlobalState() { assertThat("not-a-system-index", in(snapshottedIndices)); assertThat(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, not(in(snapshottedIndices))); + assertThat(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, not(in(snapshottedIndices))); } /** @@ -131,23 +153,44 @@ public void testSnapshotByFeature() { createRepository(REPO_NAME, "fs"); indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + refresh( + SystemIndexTestPlugin.SYSTEM_INDEX_NAME, + AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, + SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, + AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME + ); // snapshot by feature CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") .setIncludeGlobalState(true) .setWaitForCompletion(true) - .setFeatureStates(SystemIndexTestPlugin.class.getSimpleName(), AnotherSystemIndexTestPlugin.class.getSimpleName()) + .setFeatureStates( + SystemIndexTestPlugin.class.getSimpleName(), + AnotherSystemIndexTestPlugin.class.getSimpleName(), + SystemDataStreamTestPlugin.class.getSimpleName(), + AnotherSystemDataStreamTestPlugin.class.getSimpleName() + ) .get(); assertSnapshotSuccess(createSnapshotResponse); // add some other documents indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + refresh( + SystemIndexTestPlugin.SYSTEM_INDEX_NAME, + AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, + SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, + AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME + ); assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); assertThat(getDocCount(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); + assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); // restore indices as global state without closing the index RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot( @@ -160,6 +203,8 @@ public void testSnapshotByFeature() { // verify only the original document is restored assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L)); assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); + assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); } /** @@ -175,7 +220,8 @@ public void testDefaultRestoreOnlyRegularIndices() { indexDoc(regularIndex, "1", "purpose", "create an index that can be restored"); indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(regularIndex, SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + refresh(regularIndex, SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); // snapshot including global state CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") @@ -193,7 +239,11 @@ public void testDefaultRestoreOnlyRegularIndices() { assertThat(restoreResponse.getRestoreInfo().totalShards(), greaterThan(0)); assertThat( restoreResponse.getRestoreInfo().indices(), - allOf(hasItem(regularIndex), not(hasItem(SystemIndexTestPlugin.SYSTEM_INDEX_NAME))) + allOf( + hasItem(regularIndex), + not(hasItem(SystemIndexTestPlugin.SYSTEM_INDEX_NAME)), + not(hasItem(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME)) + ) ); } @@ -207,7 +257,15 @@ public void testRestoreByFeature() { indexDoc(regularIndex, "1", "purpose", "create an index that can be restored"); indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(regularIndex, SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + refresh( + regularIndex, + SystemIndexTestPlugin.SYSTEM_INDEX_NAME, + AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, + SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, + AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME + ); // snapshot including global state CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") @@ -219,10 +277,19 @@ public void testRestoreByFeature() { // add some other documents indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME); - + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + refresh( + regularIndex, + SystemIndexTestPlugin.SYSTEM_INDEX_NAME, + AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, + SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, + AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME + ); assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); assertThat(getDocCount(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); + assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); // Delete the regular index so we can restore it assertAcked(cluster().client().admin().indices().prepareDelete(regularIndex)); @@ -232,14 +299,16 @@ public void testRestoreByFeature() { TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap" - ).setWaitForCompletion(true).setFeatureStates("SystemIndexTestPlugin").get(); + ).setWaitForCompletion(true).setFeatureStates("SystemIndexTestPlugin", "SystemDataStreamTestPlugin").get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - // verify that the restored system index has only one document + // verify that the restored system index and data stream each only have one document assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); - // but the non-requested feature should still have its new document + // but the non-requested features should still have their new documents assertThat(getDocCount(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); } /** @@ -254,36 +323,58 @@ public void testSnapshotAndRestoreAssociatedIndices() { indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc"); indexDoc(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); indexDoc(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(regularIndex, AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME); + indexDataStream(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + indexDoc(AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); + + refresh( + regularIndex, + AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, + AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME, + AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME, + AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME + ); // snapshot CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") - .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName()) + .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName(), AssociatedIndicesSystemDSTestPlugin.class.getSimpleName()) .setWaitForCompletion(true) .get(); assertSnapshotSuccess(createSnapshotResponse); // verify the correctness of the snapshot - Set snapshottedIndices = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO_NAME) - .get() - .getSnapshots() + var snapshotsResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO_NAME).get(); + Set snapshottedIndices = snapshotsResponse.getSnapshots() .stream() .map(SnapshotInfo::indices) .flatMap(Collection::stream) .collect(Collectors.toSet()); + Set snapshottedDataStreams = snapshotsResponse.getSnapshots() + .stream() + .map(SnapshotInfo::dataStreams) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); assertThat(snapshottedIndices, hasItem(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME)); assertThat(snapshottedIndices, hasItem(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME)); + assertThat(snapshottedDataStreams, hasItem(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME)); + assertThat(snapshottedIndices, hasItem(AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME)); // add some other documents indexDoc(regularIndex, "2", "purpose", "post-snapshot doc"); indexDoc(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); - refresh(regularIndex, AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + refresh(regularIndex, AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME); assertThat(getDocCount(regularIndex), equalTo(2L)); assertThat(getDocCount(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); // And delete the associated index so we can restore it - assertAcked(indicesAdmin().prepareDelete(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME).get()); + assertAcked( + indicesAdmin().prepareDelete( + AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME, + AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME + ).get() + ); // restore the feature state and its associated index RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot( @@ -291,15 +382,17 @@ public void testSnapshotAndRestoreAssociatedIndices() { REPO_NAME, "test-snap" ) - .setIndices(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME) + .setIndices(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME, AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME) .setWaitForCompletion(true) - .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName()) + .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName(), AssociatedIndicesSystemDSTestPlugin.class.getSimpleName()) .get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); // verify only the original document is restored assertThat(getDocCount(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L)); assertThat(getDocCount(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME), equalTo(1L)); + assertThat(getDocCount(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); + assertThat(getDocCount(AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME), equalTo(1L)); } /** @@ -308,7 +401,8 @@ public void testSnapshotAndRestoreAssociatedIndices() { public void testRestoreFeatureNotInSnapshot() { createRepository(REPO_NAME, "fs"); indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); // snapshot including global state CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") @@ -322,7 +416,7 @@ public void testRestoreFeatureNotInSnapshot() { SnapshotRestoreException.class, clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") .setWaitForCompletion(true) - .setFeatureStates("SystemIndexTestPlugin", fakeFeatureStateName) + .setFeatureStates("SystemIndexTestPlugin", "SystemDataStreamTestPlugin", fakeFeatureStateName) ); assertThat( @@ -438,7 +532,8 @@ public void testSystemIndicesCannotBeRenamed() { public void testRestoreSystemIndicesAsGlobalStateWithDefaultFeatureStateList() { createRepository(REPO_NAME, "fs"); indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); // run a snapshot including global state CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") @@ -449,9 +544,11 @@ public void testRestoreSystemIndicesAsGlobalStateWithDefaultFeatureStateList() { // add another document indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); // restore indices as global state a null list of feature states RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot( @@ -463,6 +560,7 @@ public void testRestoreSystemIndicesAsGlobalStateWithDefaultFeatureStateList() { // verify that the system index is destroyed assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); } /** @@ -473,8 +571,9 @@ public void testRestoreSystemIndicesAsGlobalStateWithNoFeatureStates() { createRepository(REPO_NAME, "fs"); String regularIndex = "my-index"; indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc"); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, regularIndex); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, regularIndex); // run a snapshot including global state CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") @@ -485,10 +584,12 @@ public void testRestoreSystemIndicesAsGlobalStateWithNoFeatureStates() { // add another document indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc"); - refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME); + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc"); + refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME); assertAcked(indicesAdmin().prepareDelete(regularIndex).get()); assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); // restore with global state and all indices but explicitly no feature states. RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot( @@ -500,6 +601,7 @@ public void testRestoreSystemIndicesAsGlobalStateWithNoFeatureStates() { // verify that the system index still has the updated document, i.e. has not been restored assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L)); // And the regular index has been restored assertThat(getDocCount(regularIndex), equalTo(1L)); } @@ -564,6 +666,8 @@ public void testSystemIndexAliasesAreAlwaysRestored() { // Create a system index final String systemIndexName = SystemIndexTestPlugin.SYSTEM_INDEX_NAME + "-1"; indexDoc(systemIndexName, "1", "purpose", "pre-snapshot doc"); + // Create a system data stream + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); // And a regular index // And a regular index so we can avoid matching all indices on the restore @@ -603,6 +707,109 @@ public void testSystemIndexAliasesAreAlwaysRestored() { } + public void testSystemDataStreamAliasesAreAlwaysRestored() { + createRepository(REPO_NAME, "fs"); + // Create a system data stream + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + + // And a regular index so we can avoid matching all indices on the restore + final String regularIndex = "regular-index"; + final String regularAlias = "regular-alias"; + indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc"); + + // And make sure they both have aliases + final String systemDataStreamAlias = SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME + "-alias"; + assertAcked( + indicesAdmin().prepareAliases() + .addAlias(regularIndex, regularAlias) + .addAlias(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, systemDataStreamAlias, true) + .get() + ); + + // And add a doc to ensure the alias works + indexDataStream(systemDataStreamAlias, "2", "purpose", "post-alias doc"); + + // Run a snapshot including global state + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") + .setWaitForCompletion(true) + .setIncludeGlobalState(true) + .get(); + assertSnapshotSuccess(createSnapshotResponse); + + // And delete the regular index and system data stream + assertAcked(cluster().client().admin().indices().prepareDelete(regularIndex)); + assertAcked( + client().execute( + DeleteDataStreamAction.INSTANCE, + new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME) + ).actionGet() + ); + + // Now restore the snapshot with no aliases + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot( + TEST_REQUEST_TIMEOUT, + REPO_NAME, + "test-snap" + ) + .setFeatureStates("SystemDataStreamTestPlugin") + .setWaitForCompletion(true) + .setRestoreGlobalState(false) + .setIncludeAliases(false) + .get(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + // The regular index should exist + assertTrue(indexExists(regularIndex)); + assertFalse(indexExists(regularAlias)); + + // And the system data stream, queried by alias, should have 2 docs + assertTrue(indexExists(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME)); + assertTrue(indexExists(systemDataStreamAlias)); + assertThat(getDocCount(systemDataStreamAlias), equalTo(2L)); + } + + public void testDeletedDatastreamIsRestorable() { + createRepository(REPO_NAME, "fs"); + // Create a system data stream + indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc"); + + // And a regular index so we can avoid matching all indices on the restore + final String regularIndex = "regular-index"; + indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc"); + + // Run a snapshot including global state + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap") + .setWaitForCompletion(true) + .setIncludeGlobalState(true) + .get(); + assertSnapshotSuccess(createSnapshotResponse); + + // And delete the regular index and system data stream + assertAcked(cluster().client().admin().indices().prepareDelete(regularIndex)); + assertAcked( + client().execute( + DeleteDataStreamAction.INSTANCE, + new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME) + ).actionGet() + ); + + // Now restore the snapshot with no aliases + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot( + TEST_REQUEST_TIMEOUT, + REPO_NAME, + "test-snap" + ) + .setFeatureStates("SystemDataStreamTestPlugin") + .setWaitForCompletion(true) + .setRestoreGlobalState(false) + .setIncludeAliases(false) + .get(); + + // And the system data stream, queried by alias, should have 2 docs + assertTrue(indexExists(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME)); + assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L)); + } + /** * Tests that the special "none" feature state name cannot be combined with other * feature state names, and an error occurs if it's tried. @@ -686,6 +893,7 @@ public void testNoneFeatureStateOnCreation() { assertThat(snapshottedIndices, allOf(hasItem(regularIndex), not(hasItem(SystemIndexTestPlugin.SYSTEM_INDEX_NAME)))); } + // TODO, Do we need to test this for Datastreams? /** * Ensures that if we can only capture a partial snapshot of a system index, then the feature state associated with that index is * not included in the snapshot, because it would not be safe to restore that feature state. @@ -739,6 +947,61 @@ public void testPartialSnapshotsOfSystemIndexRemovesFeatureState() throws Except }); } + /** + * Ensures that if we can only capture a partial snapshot of a system data stream, then the feature state associated + * with that data stream is not included in the snapshot, because it would not be safe to restore that feature state. + */ + @AwaitsFix(bugUrl = "ES-11251") + public void testPartialSnapshotsOfSystemDataStreamRemovesFeatureState() throws Exception { + final String partialIndexName = SystemDataStreamManyShardsTestPlugin.SYSTEM_DATASTREAM_NAME; + final String fullIndexName = AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME; + + createRepositoryNoVerify(REPO_NAME, "mock"); + + // Create the index that we'll get a partial snapshot of with a bunch of shards + indexDataStream(partialIndexName, "1", "purpose", "pre-snapshot doc"); + // And another one with the default + indexDataStream(fullIndexName, "1", "purpose", "pre-snapshot doc"); + ensureGreen(); + + // Stop a random data node so we lose a shard from the partial index + internalCluster().stopRandomDataNode(); + assertBusy(() -> { + var status = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).get().getStatus(); + assertThat(status, oneOf(ClusterHealthStatus.YELLOW, ClusterHealthStatus.RED)); + }, 30, TimeUnit.SECONDS); + + // Get ready to block + blockMasterFromFinalizingSnapshotOnIndexFile(REPO_NAME); + + // Start a snapshot and wait for it to hit the block, then kill the master to force a failover + final String partialSnapName = "test-partial-snap"; + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot( + TEST_REQUEST_TIMEOUT, + REPO_NAME, + partialSnapName + ).setIncludeGlobalState(true).setWaitForCompletion(false).setPartial(true).get(); + assertThat(createSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + waitForBlock(internalCluster().getMasterName(), REPO_NAME); + internalCluster().stopCurrentMasterNode(); + + // Now get the snapshot and do our checks + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO_NAME) + .setSnapshots(partialSnapName) + .get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertNotNull(snapshotInfo); + assertThat(snapshotInfo.failedShards(), lessThan(snapshotInfo.totalShards())); + List statesInSnapshot = snapshotInfo.featureStates().stream().map(SnapshotFeatureInfo::getPluginName).toList(); + assertThat(statesInSnapshot, not(hasItem((new SystemDataStreamManyShardsTestPlugin()).getFeatureName()))); + assertThat(statesInSnapshot, hasItem((new AnotherSystemDataStreamTestPlugin()).getFeatureName())); + }, 5L, TimeUnit.SECONDS); + + // Cleanup to prevent unrelated shutdown failures + internalCluster().startDataOnlyNode(); + } + public void testParallelIndexDeleteRemovesFeatureState() throws Exception { final String indexToBeDeleted = SystemIndexTestPlugin.SYSTEM_INDEX_NAME; final String fullIndexName = AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME; @@ -814,6 +1077,14 @@ private long getDocCount(String indexName) { return indicesAdmin().prepareStats(indexName).get().getPrimaries().getDocs().getCount(); } + private DocWriteResponse indexDataStream(String index, String id, String... source) { + var sourceWithTimestamp = new String[source.length + 2]; + sourceWithTimestamp[0] = "@timestamp"; + sourceWithTimestamp[1] = Long.toString(System.currentTimeMillis()); + System.arraycopy(source, 0, sourceWithTimestamp, 2, source.length); + return prepareIndex(index).setId(id).setSource((Object[]) sourceWithTimestamp).setOpType(DocWriteRequest.OpType.CREATE).get(); + } + public static class SystemIndexTestPlugin extends Plugin implements SystemIndexPlugin { public static final String SYSTEM_INDEX_NAME = ".test-system-idx"; @@ -858,6 +1129,123 @@ public String getFeatureDescription() { } } + public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin { + + public static final String SYSTEM_DATASTREAM_NAME = ".test-system-data-stream"; + + @Override + public Collection getSystemDataStreamDescriptors() { + try { + CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}"); + return Collections.singletonList( + new SystemDataStreamDescriptor( + SYSTEM_DATASTREAM_NAME, + "system data stream test", + SystemDataStreamDescriptor.Type.EXTERNAL, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct? + .template(new Template(Settings.EMPTY, mappings, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(), + Map.of(), + List.of("product"), + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getFeatureName() { + return SystemDataStreamTestPlugin.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "A simple test plugin for data streams"; + } + } + + public static class SystemDataStreamManyShardsTestPlugin extends Plugin implements SystemIndexPlugin { + + public static final String SYSTEM_DATASTREAM_NAME = ".test-system-data-stream-many-shards"; + + @Override + public Collection getSystemDataStreamDescriptors() { + try { + CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}"); + return Collections.singletonList( + new SystemDataStreamDescriptor( + SYSTEM_DATASTREAM_NAME, + "system data stream test", + SystemDataStreamDescriptor.Type.EXTERNAL, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct? + .template(new Template(indexSettings(6, 0).build(), mappings, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(), + Map.of(), + List.of("product"), + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getFeatureName() { + return SystemDataStreamManyShardsTestPlugin.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "A simple test plugin for data streams"; + } + } + + public static class AnotherSystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin { + + public static final String SYSTEM_DATASTREAM_NAME = ".another-test-system-data-stream"; + + @Override + public Collection getSystemDataStreamDescriptors() { + try { + CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}"); + return Collections.singletonList( + new SystemDataStreamDescriptor( + SYSTEM_DATASTREAM_NAME, + "another system data stream test", + SystemDataStreamDescriptor.Type.EXTERNAL, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct? + .template(new Template(Settings.EMPTY, mappings, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(), + Map.of(), + List.of("product"), + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getFeatureName() { + return AnotherSystemDataStreamTestPlugin.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "Another simple test plugin for data streams"; + } + } + public static class AssociatedIndicesTestPlugin extends Plugin implements SystemIndexPlugin { public static final String SYSTEM_INDEX_NAME = ".third-test-system-idx"; @@ -885,4 +1273,49 @@ public String getFeatureDescription() { return "Another simple test plugin"; } } + + public static class AssociatedIndicesSystemDSTestPlugin extends Plugin implements SystemIndexPlugin { + + public static final String SYSTEM_DATASTREAM_NAME = ".test-system-data-stream-two"; + public static final String ASSOCIATED_INDEX_NAME = ".associated-idx2"; + + @Override + public Collection getSystemDataStreamDescriptors() { + try { + CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}"); + return Collections.singletonList( + new SystemDataStreamDescriptor( + SYSTEM_DATASTREAM_NAME, + "system data stream test", + SystemDataStreamDescriptor.Type.EXTERNAL, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct? + .template(new Template(Settings.EMPTY, mappings, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(), + Map.of(), + List.of("product"), + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Collection getAssociatedIndexDescriptors() { + return Collections.singletonList(new AssociatedIndexDescriptor(ASSOCIATED_INDEX_NAME, "Associated indices")); + } + + @Override + public String getFeatureName() { + return AssociatedIndicesSystemDSTestPlugin.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "Another simple test plugin"; + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 71be34db9626f..1f785b8914dcd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -30,16 +31,23 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.snapshots.SnapshotInProgressException; +import org.elasticsearch.snapshots.SnapshotsService; import java.io.IOException; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** * Handles data stream modification requests. */ public class MetadataDataStreamsService { - + private static final Logger LOGGER = LogManager.getLogger(MetadataDataStreamsService.class); private final ClusterService clusterService; private final IndicesService indicesService; private final DataStreamGlobalRetentionSettings globalRetentionSettings; @@ -410,6 +418,52 @@ private static IndexAbstraction validateIndex(Metadata metadata, String indexNam return index; } + /** + * Removes the given data stream and their backing indices from the Project State. + * + * @param currentState The current cluster state + * @param dataStreams The data streams to remove + * @param settings The settings + * @return The updated Project State + */ + public static ClusterState deleteDataStreams(ClusterState currentState, Set dataStreams, Settings settings) { + if (dataStreams.isEmpty()) { + return currentState; + } + + Set dataStreamNames = dataStreams.stream().map(DataStream::getName).collect(Collectors.toSet()); + Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(currentState, dataStreamNames); + if (snapshottingDataStreams.isEmpty() == false) { + throw new SnapshotInProgressException( + "Cannot delete data streams that are being snapshotted: [" + + String.join(", ", snapshottingDataStreams) + + "]. Try again after snapshot finishes or cancel the currently running snapshot." + ); + } + + Set backingIndicesToRemove = new HashSet<>(); + for (DataStream dataStream : dataStreams) { + assert dataStream != null; + if (currentState.metadata().dataStreams().get(dataStream.getName()) == null) { + throw new ResourceNotFoundException("data stream [" + dataStream.getName() + "] not found"); + } + backingIndicesToRemove.addAll(dataStream.getIndices()); + backingIndicesToRemove.addAll(dataStream.getFailureIndices()); + } + + // first delete the data streams and then the indices: + // (this to avoid data stream validation from failing when deleting an index that is part of a data stream + // without updating the data stream) + // TODO: change order when "delete index api" also updates the data stream the "index to be removed" is a member of + Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + for (DataStream ds : dataStreams) { + LOGGER.info("removing data stream [{}]", ds.getName()); + metadata.removeDataStream(ds.getName()); + } + currentState = ClusterState.builder(currentState).metadata(metadata).build(); + return MetadataDeleteIndexService.deleteIndices(currentState, backingIndicesToRemove, settings); + } + /** * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion. */ diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 0bdf39142f766..3cfa1381e6132 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -618,7 +619,7 @@ private static Tuple, Map> getD Collection featureStateDataStreams, boolean includeAliases ) { - Map dataStreams; + Map allDataStreams; Map dataStreamAliases; List requestedDataStreams = filterIndices( snapshotInfo.dataStreams(), @@ -626,20 +627,21 @@ private static Tuple, Map> getD IndicesOptions.lenientExpand() ); if (requestedDataStreams.isEmpty()) { - dataStreams = Map.of(); + allDataStreams = Map.of(); dataStreamAliases = Map.of(); } else { if (globalMetadata == null) { globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId); } final Map dataStreamsInSnapshot = globalMetadata.dataStreams(); - dataStreams = Maps.newMapWithExpectedSize(requestedDataStreams.size()); + allDataStreams = Maps.newMapWithExpectedSize(requestedDataStreams.size()); + Map systemDataStreams = new HashMap<>(); for (String requestedDataStream : requestedDataStreams) { final DataStream dataStreamInSnapshot = dataStreamsInSnapshot.get(requestedDataStream); assert dataStreamInSnapshot != null : "DataStream [" + requestedDataStream + "] not found in snapshot"; if (dataStreamInSnapshot.isSystem() == false) { - dataStreams.put(requestedDataStream, dataStreamInSnapshot); + allDataStreams.put(requestedDataStream, dataStreamInSnapshot); } else if (requestIndices.contains(requestedDataStream)) { throw new IllegalArgumentException( format( @@ -648,7 +650,8 @@ private static Tuple, Map> getD ) ); } else if (featureStateDataStreams.contains(requestedDataStream)) { - dataStreams.put(requestedDataStream, dataStreamInSnapshot); + allDataStreams.put(requestedDataStream, dataStreamInSnapshot); + systemDataStreams.put(requestedDataStream, dataStreamInSnapshot); } else { logger.debug( "omitting system data stream [{}] from snapshot restoration because its feature state was not requested", @@ -656,11 +659,12 @@ private static Tuple, Map> getD ); } } - if (includeAliases) { + if (includeAliases || systemDataStreams.isEmpty() == false) { dataStreamAliases = new HashMap<>(); final Map dataStreamAliasesInSnapshot = globalMetadata.dataStreamAliases(); + Map dataStreamsWithAliases = includeAliases ? allDataStreams : systemDataStreams; for (DataStreamAlias alias : dataStreamAliasesInSnapshot.values()) { - DataStreamAlias copy = alias.intersect(dataStreams.keySet()::contains); + DataStreamAlias copy = alias.intersect(dataStreamsWithAliases.keySet()::contains); if (copy.getDataStreams().isEmpty() == false) { dataStreamAliases.put(alias.getName(), copy); } @@ -669,7 +673,7 @@ private static Tuple, Map> getD dataStreamAliases = Map.of(); } } - return new Tuple<>(dataStreams, dataStreamAliases); + return new Tuple<>(allDataStreams, dataStreamAliases); } private Map> getFeatureStatesToRestore( @@ -762,6 +766,29 @@ private Set resolveSystemIndicesToDelete(ClusterState currentState, Set resolveSystemDataStreamsToDelete(ClusterState currentState, Set featureStatesToRestore) { + if (featureStatesToRestore == null) { + return Collections.emptySet(); + } + + return featureStatesToRestore.stream() + .map(systemIndices::getFeature) + .filter(Objects::nonNull) // Features that aren't present on this node will be warned about in `getFeatureStatesToRestore` + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .map(SystemDataStreamDescriptor::getDataStreamName) + .filter(datastreamName -> currentState.metadata().dataStreams().containsKey(datastreamName)) + .map(dataStreamName -> currentState.metadata().dataStreams().get(dataStreamName)) + .collect(Collectors.toUnmodifiableSet()); + } + // visible for testing static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metadata, RestoreSnapshotRequest request) { String dataStreamName = dataStream.getName(); @@ -1333,6 +1360,13 @@ public ClusterState execute(ClusterState currentState) { settings ); + // Clear out all existing system data streams + currentState = MetadataDataStreamsService.deleteDataStreams( + currentState, + resolveSystemDataStreamsToDelete(currentState, featureStatesToRestore), + settings + ); + // List of searchable snapshots indices to restore final Set searchableSnapshotsIndices = new HashSet<>(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index 3c8b6ebbfb271..91a89c3b283b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -9,8 +9,10 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -21,11 +23,19 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInProgressException; +import org.elasticsearch.snapshots.SnapshotInfoTestUtils; +import org.elasticsearch.test.index.IndexVersionUtils; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -455,6 +465,55 @@ public void testUpdateDataStreamOptions() { assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY)); } + public void testDeleteMissing() { + DataStream dataStream = DataStreamTestHelper.randomInstance(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build(); + + ResourceNotFoundException e = expectThrows( + ResourceNotFoundException.class, + () -> MetadataDataStreamsService.deleteDataStreams(state, Set.of(dataStream), Settings.EMPTY) + ); + assertThat(e.getMessage(), containsString(dataStream.getName())); + } + + public void testDeleteSnapshotting() { + String dataStreamName = randomAlphaOfLength(5); + Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid")); + SnapshotsInProgress snaps = SnapshotsInProgress.EMPTY.withAddedEntry( + SnapshotsInProgress.Entry.snapshot( + snapshot, + true, + false, + SnapshotsInProgress.State.INIT, + Collections.emptyMap(), + List.of(dataStreamName), + Collections.emptyList(), + System.currentTimeMillis(), + (long) randomIntBetween(0, 1000), + Map.of(), + null, + SnapshotInfoTestUtils.randomUserMetadata(), + IndexVersionUtils.randomVersion() + ) + ); + final DataStream dataStream = DataStreamTestHelper.randomInstance(dataStreamName); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .putCustom(SnapshotsInProgress.TYPE, snaps) + .metadata(Metadata.builder().put(dataStream).build()) + .build(); + Exception e = expectThrows( + SnapshotInProgressException.class, + () -> MetadataDataStreamsService.deleteDataStreams(state, Set.of(dataStream), Settings.EMPTY) + ); + assertEquals( + "Cannot delete data streams that are being snapshotted: [" + + dataStreamName + + "]. Try again after snapshot finishes " + + "or cancel the currently running snapshot.", + e.getMessage() + ); + } + private MapperService getMapperService(IndexMetadata im) { try { String mapping = im.mapping().source().toString();