diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 6befdeb2c068a..391a33782787a 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -42,6 +42,7 @@ import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -388,7 +389,7 @@ private static ByteSizeValue objectSizeLimit(ByteSizeValue chunkSize, ByteSizeVa @Override public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { final FinalizeSnapshotContext wrappedFinalizeContext; - if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) { + if (SnapshotsServiceUtils.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) { final ListenableFuture metadataDone = new ListenableFuture<>(); wrappedFinalizeContext = new FinalizeSnapshotContext( finalizeSnapshotContext.serializeProjectMetadata(), diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java index dc002ea1a44c1..1b2fc46a7ff9d 100644 --- a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -18,7 +18,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.XContentParser; @@ -183,7 +183,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { // incompatibility in the downgrade test step. We verify that it is impossible here and then create the repo using verify=false // to check behavior on other operations below. final boolean verify = TEST_STEP != TestStep.STEP3_OLD_CLUSTER - || SnapshotsService.includesUUIDs(minNodeVersion) + || SnapshotsServiceUtils.includesUUIDs(minNodeVersion) || minNodeVersion.before(IndexVersions.V_7_12_0); if (verify == false) { expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> createRepository(repoName, false, true)); @@ -208,7 +208,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards, index); } } else { - if (SnapshotsService.includesUUIDs(minNodeVersion) == false) { + if (SnapshotsServiceUtils.includesUUIDs(minNodeVersion) == false) { assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> listSnapshots(repoName)); expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> deleteSnapshot(repoName, "snapshot-1")); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index 7d6c3b2d2296a..52cc5e4bd919b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -327,7 +327,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { logger.info("--> verify that repo is assumed in old metadata format"); assertThat( - SnapshotsService.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), + SnapshotsServiceUtils.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), is(SnapshotsService.OLD_SNAPSHOT_FORMAT) ); @@ -336,7 +336,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot"); assertThat( - SnapshotsService.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), + SnapshotsServiceUtils.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), is(IndexVersion.current()) ); final RepositoryData finalRepositoryData = getRepositoryData(repoName); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index a7c452cd3e43c..95216698556e6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -37,7 +37,7 @@ import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -178,8 +178,8 @@ private void cleanupRepo(ProjectId projectId, String repositoryName, ActionListe @Override public ClusterState execute(ClusterState currentState) { final ProjectMetadata projectMetadata = currentState.metadata().getProject(projectId); - SnapshotsService.ensureRepositoryExists(repositoryName, projectMetadata); - SnapshotsService.ensureNotReadOnly(projectMetadata, repositoryName); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, projectMetadata); + SnapshotsServiceUtils.ensureNotReadOnly(projectMetadata, repositoryName); // Repository cleanup is intentionally cluster wide exclusive final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState); if (repositoryCleanupInProgress.hasCleanupInProgress()) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 61925bd31d156..effaef9caed85 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -46,7 +46,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.snapshots.SnapshotState; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -121,7 +121,7 @@ protected void masterOperation( final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state); final ProjectId projectId = projectResolver.getProjectId(); - List currentSnapshots = SnapshotsService.currentSnapshots( + List currentSnapshots = SnapshotsServiceUtils.currentSnapshots( snapshotsInProgress, projectId, request.repository(), diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 9082dad757f8c..b4eb1fe8ed1bb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -50,7 +50,7 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; @@ -305,7 +305,7 @@ private RolloverResult rolloverDataStream( boolean isFailureStoreRollover ) throws Exception { final ProjectMetadata metadata = projectState.metadata(); - Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams( + Set snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams( projectState, Collections.singleton(dataStream.getName()) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index e365e8972488f..05a8a321586c6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -37,7 +37,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import java.io.IOException; import java.util.HashSet; @@ -664,7 +664,7 @@ public static ClusterState deleteDataStreams(ProjectState projectState, Set dataStreamNames = dataStreams.stream().map(DataStream::getName).collect(Collectors.toSet()); - Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreamNames); + Set snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams(projectState, dataStreamNames); if (snapshottingDataStreams.isEmpty() == false) { throw new SnapshotInProgressException( "Cannot delete data streams that are being snapshotted: [" diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index 3ba4ad1cd2454..1e6882f54b361 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -35,7 +35,7 @@ import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import java.util.HashMap; import java.util.HashSet; @@ -186,7 +186,7 @@ public static ClusterState deleteIndices(ProjectState projectState, Set i } // Check if index deletion conflicts with any running snapshots - Set snapshottingIndices = SnapshotsService.snapshottingIndices(projectState, indicesToDelete); + Set snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(projectState, indicesToDelete); if (snapshottingIndices.isEmpty() == false) { throw new SnapshotInProgressException( "Cannot delete indices that are being snapshotted: " diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index e2560046b99bb..a8b0230881309 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -77,7 +77,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -346,7 +346,7 @@ static ClusterState addIndexClosedBlocks( } // Check if index closing conflicts with any running snapshots - Set snapshottingIndices = SnapshotsService.snapshottingIndices(currentProjectState, indicesToClose); + Set snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(currentProjectState, indicesToClose); if (snapshottingIndices.isEmpty() == false) { throw new SnapshotInProgressException( "Cannot close indices that are being snapshotted: " @@ -934,7 +934,7 @@ static Tuple> closeRoutingTable( } // Check if index closing conflicts with any running snapshots - Set snapshottingIndices = SnapshotsService.snapshottingIndices(currentProjectState, Set.of(index)); + Set snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(currentProjectState, Set.of(index)); if (snapshottingIndices.isEmpty() == false) { closingResults.put( result.getKey(), diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index c9d8a703bbfdd..2c0fdb198ea0f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -17,7 +17,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import java.util.Map; import java.util.Set; @@ -111,7 +111,11 @@ public Map> obsoleteShardGenerations() { * Returns a new {@link ClusterState}, based on the given {@code state} with the create-snapshot entry removed. */ public ClusterState updatedClusterState(ClusterState state) { - final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations); + final ClusterState updatedState = SnapshotsServiceUtils.stateWithoutSnapshot( + state, + snapshotInfo.snapshot(), + updatedShardGenerations + ); // Now that the updated cluster state may have changed in-progress shard snapshots' shard generations to the latest shard // generation, let's mark any now unreferenced shard generations as obsolete and ready to be deleted. obsoleteGenerations.set( diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 896b548f737be..cd2d835a81f0f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -30,6 +30,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -699,9 +700,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final IndexVersion repoMetaVersion, boolean permitMissingUuid) throws IOException { - final boolean shouldWriteUUIDS = SnapshotsService.includesUUIDs(repoMetaVersion); - final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); - final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); + final boolean shouldWriteUUIDS = SnapshotsServiceUtils.includesUUIDs(repoMetaVersion); + final boolean shouldWriteIndexGens = SnapshotsServiceUtils.useIndexGenerations(repoMetaVersion); + final boolean shouldWriteShardGens = SnapshotsServiceUtils.useShardGenerations(repoMetaVersion); assert Boolean.compare(shouldWriteUUIDS, shouldWriteIndexGens) <= 0; assert Boolean.compare(shouldWriteIndexGens, shouldWriteShardGens) <= 0; @@ -908,7 +909,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g this snapshot repository format requires Elasticsearch version [%s] or later""", versionString)); }; - assert SnapshotsService.useShardGenerations(version); + assert SnapshotsServiceUtils.useShardGenerations(version); } case UUID -> { XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 43cb1a7d3caec..66b6fa446e027 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -130,6 +130,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.LeakTracker; @@ -999,7 +1000,7 @@ private void createSnapshotsDeletion( return new SnapshotsDeletion( snapshotIds, repositoryDataGeneration, - SnapshotsService.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds), + SnapshotsServiceUtils.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds), originalRootBlobs, blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT_DATA), originalRepositoryData @@ -1105,7 +1106,7 @@ class SnapshotsDeletion { this.snapshotIds = snapshotIds; this.originalRepositoryDataGeneration = originalRepositoryDataGeneration; this.repositoryFormatIndexVersion = repositoryFormatIndexVersion; - this.useShardGenerations = SnapshotsService.useShardGenerations(repositoryFormatIndexVersion); + this.useShardGenerations = SnapshotsServiceUtils.useShardGenerations(repositoryFormatIndexVersion); this.originalRootBlobs = originalRootBlobs; this.originalIndexContainers = originalIndexContainers; this.originalRepositoryData = originalRepositoryData; @@ -1776,11 +1777,11 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened // when writing the index-${N} to each shard directory. final IndexVersion repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion(); - final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); + final boolean writeShardGens = SnapshotsServiceUtils.useShardGenerations(repositoryMetaVersion); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); + final boolean writeIndexGens = SnapshotsServiceUtils.useIndexGenerations(repositoryMetaVersion); record MetadataWriteResult( RepositoryData existingRepositoryData, @@ -2563,7 +2564,7 @@ private void cacheRepositoryData(RepositoryData repositoryData, IndexVersion ver return; } final RepositoryData toCache; - if (SnapshotsService.useShardGenerations(version)) { + if (SnapshotsServiceUtils.useShardGenerations(version)) { toCache = repositoryData; } else { // don't cache shard generations here as they may be unreliable @@ -2918,7 +2919,7 @@ public void onFailure(Exception e) { }, true); maybeWriteIndexLatest(newGen); - if (filteredRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) && SnapshotsService.includesUUIDs(version)) { + if (filteredRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) && SnapshotsServiceUtils.includesUUIDs(version)) { assert newRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) == false; logger.info( Strings.format( @@ -3000,7 +3001,7 @@ public String toString() { } private RepositoryData updateRepositoryData(RepositoryData repositoryData, IndexVersion repositoryMetaversion, long newGen) { - if (SnapshotsService.includesUUIDs(repositoryMetaversion)) { + if (SnapshotsServiceUtils.includesUUIDs(repositoryMetaversion)) { final String clusterUUID = clusterService.state().metadata().clusterUUID(); if (repositoryData.getClusterUUID().equals(clusterUUID) == false) { repositoryData = repositoryData.withClusterUuid(clusterUUID); @@ -3137,7 +3138,7 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state, } } updatedDeletionsInProgress = changedDeletions ? SnapshotDeletionsInProgress.of(deletionEntries) : null; - return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress); + return SnapshotsServiceUtils.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress); } private RepositoryMetadata getRepoMetadata(ProjectMetadata projectMetadata) { @@ -3376,8 +3377,8 @@ private void doSnapshotShard(SnapshotShardContext context) { ); final ShardGeneration indexGeneration; - final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion()); - final boolean writeFileInfoWriterUUID = SnapshotsService.includeFileInfoWriterUUID(context.getRepositoryMetaVersion()); + final boolean writeShardGens = SnapshotsServiceUtils.useShardGenerations(context.getRepositoryMetaVersion()); + final boolean writeFileInfoWriterUUID = SnapshotsServiceUtils.includeFileInfoWriterUUID(context.getRepositoryMetaVersion()); // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = snapshots.withAddedSnapshot( new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier()) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 3fd5a226936ac..c4de1c10f0cca 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -398,7 +398,7 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr newSnapshotShards.put(shardId, snapshotStatus); final IndexId indexId = entry.indices().get(shardId.getIndexName()); assert indexId != null; - assert SnapshotsService.useShardGenerations(entry.version()) + assert SnapshotsServiceUtils.useShardGenerations(entry.version()) || ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null : "Found non-null, non-numeric shard generation [" + snapshotStatus.generation() diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 68eb8bd8cf2cd..7c64eeefb77c2 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; @@ -33,8 +32,6 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -54,9 +51,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RerouteService; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; @@ -78,11 +73,9 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Predicates; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.shard.ShardId; @@ -96,7 +89,6 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; -import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardGenerations; @@ -116,7 +108,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -132,7 +123,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; import static org.elasticsearch.common.Strings.arrayToCommaDelimitedString; import static org.elasticsearch.core.Strings.format; @@ -288,7 +278,7 @@ public void executeSnapshot( public void createSnapshot(final ProjectId projectId, final CreateSnapshotRequest request, final ActionListener listener) { final String repositoryName = request.repository(); final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); - validate(repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, request.uuid()); Repository repository = repositoriesService.repository(projectId, request.repository()); if (repository.isReadOnly()) { @@ -326,19 +316,6 @@ private void submitCreateSnapshotRequest( ); } - private static void ensureSnapshotNameNotRunning( - SnapshotsInProgress runningSnapshots, - ProjectId projectId, - String repositoryName, - String snapshotName - ) { - if (runningSnapshots.forRepo(projectId, repositoryName) - .stream() - .anyMatch(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName))) { - throw new SnapshotNameAlreadyInUseException(repositoryName, snapshotName, "snapshot with the same name is already in-progress"); - } - } - // TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache // for repository metadata and loading it has predictable performance public void cloneSnapshot(ProjectId projectId, CloneSnapshotRequest request, ActionListener listener) { @@ -349,7 +326,7 @@ public void cloneSnapshot(ProjectId projectId, CloneSnapshotRequest request, Act return; } final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.target()); - validate(repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName); // TODO: create snapshot UUID in CloneSnapshotRequest and make this operation idempotent to cleanly deal with transport layer // retries final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); @@ -362,13 +339,13 @@ public void cloneSnapshot(ProjectId projectId, CloneSnapshotRequest request, Act @Override public ClusterState execute(ClusterState currentState) { final var projectMetadata = currentState.metadata().getProject(projectId); - ensureRepositoryExists(repositoryName, projectMetadata); - ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot"); - ensureNotReadOnly(projectMetadata, repositoryName); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, projectMetadata); + SnapshotsServiceUtils.ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + SnapshotsServiceUtils.ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot"); + SnapshotsServiceUtils.ensureNotReadOnly(projectMetadata, repositoryName); final SnapshotsInProgress snapshots = SnapshotsInProgress.get(currentState); - ensureSnapshotNameNotRunning(snapshots, projectId, repositoryName, snapshotName); - validate(repositoryName, snapshotName, projectMetadata); + SnapshotsServiceUtils.ensureSnapshotNameNotRunning(snapshots, projectId, repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName, projectMetadata); final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds() .stream() @@ -411,7 +388,11 @@ public ClusterState execute(ClusterState currentState) { repositoryData.resolveIndices(matchingIndices), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - minCompatibleVersion(currentState.nodes().getMaxDataNodeCompatibleIndexVersion(), repositoryData, null) + SnapshotsServiceUtils.minCompatibleVersion( + currentState.nodes().getMaxDataNodeCompatibleIndexVersion(), + repositoryData, + null + ) // NB minCompatibleVersion iterates over all the snapshots in the current repositoryData, which probably should happen on a // different thread. Also is the _current_ repositoryData the right thing to consider? The minimum repository format version // can only advance during a snapshot delete which today is never concurrent to other writes, but a future version may allow @@ -423,7 +404,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { initializingClones.remove(snapshot); - logSnapshotFailure("clone", snapshot, e); + SnapshotsServiceUtils.logSnapshotFailure("clone", snapshot, e); listener.onFailure(e); } @@ -436,50 +417,6 @@ public void clusterStateProcessed(ClusterState oldState, final ClusterState newS }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure); } - /** - * Checks the cluster state for any in-progress repository cleanup tasks ({@link RepositoryCleanupInProgress}). - * Note that repository cleanup is intentionally cluster wide exclusive. - */ - private static void ensureNoCleanupInProgress( - final ClusterState currentState, - final String repositoryName, - final String snapshotName, - final String reason - ) { - final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState); - if (repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - snapshotName, - "cannot " - + reason - + " while a repository cleanup is in-progress in " - + repositoryCleanupInProgress.entries() - .stream() - .map(RepositoryCleanupInProgress.Entry::repository) - .collect(Collectors.toSet()) - ); - } - } - - public static void ensureNotReadOnly(final ProjectMetadata projectMetadata, final String repositoryName) { - final var repositoryMetadata = RepositoriesMetadata.get(projectMetadata).repository(repositoryName); - if (RepositoriesService.isReadOnly(repositoryMetadata.settings())) { - throw new RepositoryException(repositoryMetadata.name(), "repository is readonly"); - } - } - - private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) { - // check if the snapshot name already exists in the repository - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new SnapshotNameAlreadyInUseException( - repository.getMetadata().name(), - snapshotName, - "snapshot with the same name already exists" - ); - } - } - /** * Determine the number of shards in each index of a clone operation and update the cluster state accordingly. * @@ -590,7 +527,7 @@ public ClusterState execute(ClusterState currentState) { // shard snapshot state was based on all previous existing operations in progress // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer updatedEntries.add(updatedEntry); - return updateWithSnapshots( + return SnapshotsServiceUtils.updateWithSnapshots( currentState, snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectId, repoName, updatedEntries), null @@ -730,83 +667,8 @@ private void ensureBelowConcurrencyLimit( } } - /** - * Throws {@link RepositoryMissingException} if no repository by the given name is found in the given cluster state. - */ - public static void ensureRepositoryExists(String repoName, ProjectMetadata projectMetadata) { - if (RepositoriesMetadata.get(projectMetadata).repository(repoName) == null) { - throw new RepositoryMissingException(repoName); - } - } - - /** - * Validates snapshot request - * - * @param repositoryName repository name - * @param snapshotName snapshot name - * @param projectMetadata current project metadata - */ - private static void validate(String repositoryName, String snapshotName, ProjectMetadata projectMetadata) { - if (RepositoriesMetadata.get(projectMetadata).repository(repositoryName) == null) { - throw new RepositoryMissingException(repositoryName); - } - validate(repositoryName, snapshotName); - } - - private static void validate(final String repositoryName, final String snapshotName) { - if (Strings.hasLength(snapshotName) == false) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "cannot be empty"); - } - if (snapshotName.contains(" ")) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain whitespace"); - } - if (snapshotName.contains(",")) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain ','"); - } - if (snapshotName.contains("#")) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain '#'"); - } - if (snapshotName.charAt(0) == '_') { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not start with '_'"); - } - if (snapshotName.toLowerCase(Locale.ROOT).equals(snapshotName) == false) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must be lowercase"); - } - if (Strings.validFileName(snapshotName) == false) { - throw new InvalidSnapshotNameException( - repositoryName, - snapshotName, - "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS - ); - } - } - - private static UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { - ShardGenerations.Builder builder = ShardGenerations.builder(); - ShardGenerations.Builder deletedBuilder = null; - if (snapshot.isClone()) { - snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value)); - } else { - for (Map.Entry entry : snapshot.shardSnapshotStatusByRepoShardId().entrySet()) { - RepositoryShardId key = entry.getKey(); - ShardSnapshotStatus value = entry.getValue(); - final Index index = snapshot.indexByName(key.indexName()); - if (metadata.getProject(snapshot.projectId()).hasIndex(index) == false) { - assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; - if (deletedBuilder == null) { - deletedBuilder = ShardGenerations.builder(); - } - deletedBuilder.put(key.index(), key.shardId(), value); - continue; - } - builder.put(key.index(), key.shardId(), value); - } - } - return new UpdatedShardGenerations(builder.build(), deletedBuilder == null ? ShardGenerations.EMPTY : deletedBuilder.build()); - } - private Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata, ProjectId projectId) { - final ProjectMetadata snapshotProject = projectForSnapshot(snapshot, metadata.getProject(projectId)); + final ProjectMetadata snapshotProject = SnapshotsServiceUtils.projectForSnapshot(snapshot, metadata.getProject(projectId)); final Metadata.Builder builder; if (snapshot.includeGlobalState() == false) { // Remove global state from the cluster state @@ -818,80 +680,6 @@ private Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadat return builder.build(); } - private static ProjectMetadata projectForSnapshot(SnapshotsInProgress.Entry snapshot, ProjectMetadata project) { - final ProjectMetadata.Builder builder; - if (snapshot.includeGlobalState() == false) { - // Create a new project state that only includes the index data - builder = ProjectMetadata.builder(project.id()); - for (IndexId index : snapshot.indices().values()) { - final IndexMetadata indexMetadata = project.index(index.getName()); - if (indexMetadata == null) { - assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; - } else { - builder.put(indexMetadata, false); - } - } - } else { - builder = ProjectMetadata.builder(project); - } - // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have - // all their indices contained in the snapshot - final Map dataStreams = new HashMap<>(); - final Set indicesInSnapshot = snapshot.indices().keySet(); - for (String dataStreamName : snapshot.dataStreams()) { - DataStream dataStream = project.dataStreams().get(dataStreamName); - if (dataStream == null) { - assert snapshot.partial() - : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; - } else { - final DataStream reconciled = dataStream.snapshot(indicesInSnapshot, builder); - if (reconciled != null) { - dataStreams.put(dataStreamName, reconciled); - } - } - } - return builder.dataStreams(dataStreams, filterDataStreamAliases(dataStreams, project.dataStreamAliases())).build(); - } - - /** - * Returns status of the currently running snapshots - *

- * This method is executed on master node - *

- * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param projectId project to look for the repository - * @param repository repository id - * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered - * @return list of metadata for currently running snapshots - */ - public static List currentSnapshots( - @Nullable SnapshotsInProgress snapshotsInProgress, - ProjectId projectId, - String repository, - List snapshots - ) { - if (snapshotsInProgress == null || snapshotsInProgress.isEmpty()) { - return Collections.emptyList(); - } - if ("_all".equals(repository)) { - return snapshotsInProgress.asStream(projectId).toList(); - } - if (snapshots.isEmpty()) { - return snapshotsInProgress.forRepo(projectId, repository); - } - List builder = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(projectId, repository)) { - for (String snapshot : snapshots) { - if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) { - builder.add(entry); - break; - } - } - } - return unmodifiableList(builder); - } - @Override public void applyClusterState(ClusterChangedEvent event) { try { @@ -900,14 +688,16 @@ public void applyClusterState(ClusterChangedEvent event) { SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(event.state()); final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; processExternalChanges( - newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), + newMaster || SnapshotsServiceUtils.removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), snapshotsInProgress.nodeIdsForRemovalChanged(SnapshotsInProgress.get(event.previousState())) - || (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) + || (event.routingTableChanged() + && SnapshotsServiceUtils.waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) ); if (newMaster || event.state().metadata().nodeShutdowns().equals(event.previousState().metadata().nodeShutdowns()) == false - || supportsNodeRemovalTracking(event.state()) != supportsNodeRemovalTracking(event.previousState())) { + || SnapshotsServiceUtils.supportsNodeRemovalTracking(event.state()) != SnapshotsServiceUtils + .supportsNodeRemovalTracking(event.previousState())) { updateNodeIdsToRemoveQueue.submitTask( "SnapshotsService#updateNodeIdsToRemove", new UpdateNodeIdsForRemovalTask(), @@ -936,7 +726,7 @@ public void applyClusterState(ClusterChangedEvent event) { final Exception cause = new NotMasterException("no longer master"); for (final Iterator>> it = snapshotDeletionListeners.values().iterator(); it.hasNext();) { final List> listeners = it.next(); - readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, cause)); + readyToResolveListeners.add(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, cause)); it.remove(); } } @@ -949,7 +739,7 @@ public void applyClusterState(ClusterChangedEvent event) { logger.warn("Failed to update snapshot state ", e); } assert assertConsistentWithClusterState(event.state()); - assert assertNoDanglingSnapshots(event.state()); + assert SnapshotsServiceUtils.assertNoDanglingSnapshots(event.state()); } private boolean assertConsistentWithClusterState(ClusterState state) { @@ -983,35 +773,6 @@ private boolean assertConsistentWithClusterState(ClusterState state) { return true; } - // Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it - // to be assigned - private static boolean assertNoDanglingSnapshots(ClusterState state) { - final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state); - final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state); - final Set reposWithRunningDelete = snapshotDeletionsInProgress.getEntries() - .stream() - .filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.STARTED) - .map(entry -> new ProjectRepo(entry.projectId(), entry.repository())) - .collect(Collectors.toSet()); - for (List repoEntry : snapshotsInProgress.entriesByRepo()) { - final SnapshotsInProgress.Entry entry = repoEntry.get(0); - for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { - if (value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) { - assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) - : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete"; - } else if (value.isActive()) { - assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) == false - : "Found shard snapshot actively executing in [" - + entry - + "] when it should be blocked by a running delete [" - + Strings.toString(snapshotDeletionsInProgress) - + "]"; - } - } - } - return true; - } - /** * Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (master fail-over or * disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is @@ -1129,13 +890,14 @@ public ClusterState execute(ClusterState currentState) { } else { // Not a clone, and the snapshot is in STARTED or ABORTED state. - ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes( - snapshotEntry, - currentState.routingTable(projectId), - nodes, - snapshotsInProgress::isNodeIdForRemoval, - knownFailures - ); + ImmutableOpenMap shards = SnapshotsServiceUtils + .processWaitingShardsAndRemovedNodes( + snapshotEntry, + currentState.routingTable(projectId), + nodes, + snapshotsInProgress::isNodeIdForRemoval, + knownFailures + ); if (shards != null) { final SnapshotsInProgress.Entry updatedSnapshot = snapshotEntry.withShardStates(shards); changed = true; @@ -1170,7 +932,7 @@ public ClusterState execute(ClusterState currentState) { ); } } - final ClusterState res = readyDeletions( + final ClusterState res = SnapshotsServiceUtils.readyDeletions( updatedSnapshots != snapshotsInProgress ? ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build() : currentState, @@ -1221,177 +983,6 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) }); } - /** - * Walks through the snapshot entries' shard snapshots and creates applies updates from looking at removed nodes or indexes and known - * failed shard snapshots on the same shard IDs. - * - * @param nodeIdRemovalPredicate identify any nodes that are marked for removal / in shutdown mode - * @param knownFailures already known failed shard snapshots, but more may be found in this method - * @return an updated map of shard statuses - */ - private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( - SnapshotsInProgress.Entry snapshotEntry, - RoutingTable routingTable, - DiscoveryNodes nodes, - Predicate nodeIdRemovalPredicate, - Map knownFailures - ) { - assert snapshotEntry.isClone() == false : "clones take a different path"; - boolean snapshotChanged = false; - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - for (Map.Entry shardSnapshotEntry : snapshotEntry.shardSnapshotStatusByRepoShardId() - .entrySet()) { - ShardSnapshotStatus shardStatus = shardSnapshotEntry.getValue(); - ShardId shardId = snapshotEntry.shardId(shardSnapshotEntry.getKey()); - if (shardStatus.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) { - // this shard snapshot is waiting for a previous snapshot to finish execution for this shard - final ShardSnapshotStatus knownFailure = knownFailures.get(shardSnapshotEntry.getKey()); - if (knownFailure == null) { - final IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); - if (indexShardRoutingTable == null) { - // shard became unassigned while queued after a delete or clone operation so we can fail as missing here - assert snapshotEntry.partial(); - snapshotChanged = true; - logger.debug("failing snapshot of shard [{}] because index got deleted", shardId); - shards.put(shardId, ShardSnapshotStatus.MISSING); - knownFailures.put(shardSnapshotEntry.getKey(), ShardSnapshotStatus.MISSING); - } else { - // if no failure is known for the shard we keep waiting - shards.put(shardId, shardStatus); - } - } else { - // If a failure is known for an execution we waited on for this shard then we fail with the same exception here - // as well - snapshotChanged = true; - shards.put(shardId, knownFailure); - } - } else if (shardStatus.state() == ShardState.WAITING || shardStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { - // The shard primary wasn't assigned, or the shard snapshot was paused because the node was shutting down. - IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); - if (indexShardRoutingTable != null) { - IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id()); - if (shardRouting != null) { - final var primaryNodeId = shardRouting.primaryShard().currentNodeId(); - if (nodeIdRemovalPredicate.test(primaryNodeId)) { - if (shardStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { - // Shard that we are waiting for is on a node marked for removal, keep it as PAUSED_FOR_REMOVAL - shards.put(shardId, shardStatus); - } else { - // Shard that we are waiting for is on a node marked for removal, move it to PAUSED_FOR_REMOVAL - snapshotChanged = true; - shards.put( - shardId, - new ShardSnapshotStatus(primaryNodeId, ShardState.PAUSED_FOR_NODE_REMOVAL, shardStatus.generation()) - ); - } - continue; - } else if (shardRouting.primaryShard().started()) { - // Shard that we were waiting for has started on a node, let's process it - snapshotChanged = true; - logger.debug(""" - Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ - shard state [{}] - """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); - shards.put(shardId, new ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); - continue; - } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { - // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait - shards.put(shardId, shardStatus); - continue; - } - } - } - // Shard that we were waiting for went into unassigned state or disappeared (index or shard is gone) - giving up - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on node [{}] because shard is unassigned", shardId, shardStatus.nodeId()); - final ShardSnapshotStatus failedState = new ShardSnapshotStatus( - shardStatus.nodeId(), - ShardState.FAILED, - shardStatus.generation(), - "shard is unassigned" - ); - shards.put(shardId, failedState); - knownFailures.put(shardSnapshotEntry.getKey(), failedState); - } else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) { - if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardId, shardStatus); - } else { - // TODO: Restart snapshot on another node? - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on departed node [{}]", shardId, shardStatus.nodeId()); - final ShardSnapshotStatus failedState = new ShardSnapshotStatus( - shardStatus.nodeId(), - ShardState.FAILED, - shardStatus.generation(), - "node left the cluster during snapshot" - ); - shards.put(shardId, failedState); - knownFailures.put(shardSnapshotEntry.getKey(), failedState); - } - } else { - shards.put(shardId, shardStatus); - } - } - if (snapshotChanged) { - return shards.build(); - } else { - return null; - } - } - - private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { - for (List entries : snapshotsInProgress.entriesByRepo()) { - for (SnapshotsInProgress.Entry entry : entries) { - if (entry.state() == SnapshotsInProgress.State.STARTED && entry.isClone() == false) { - final ProjectId projectId = entry.projectId(); - for (Map.Entry shardStatus : entry.shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardState state = shardStatus.getValue().state(); - if (state != ShardState.WAITING && state != ShardState.QUEUED && state != ShardState.PAUSED_FOR_NODE_REMOVAL) { - continue; - } - final RepositoryShardId shardId = shardStatus.getKey(); - final Index index = entry.indexByName(shardId.indexName()); - if (event.indexRoutingTableChanged(index)) { - IndexRoutingTable indexShardRoutingTable = event.state().routingTable(projectId).index(index); - if (indexShardRoutingTable == null) { - // index got removed concurrently and we have to fail WAITING, QUEUED and PAUSED_FOR_REMOVAL state shards - return true; - } - ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.shardId()).primaryShard(); - if (shardRouting.started() && snapshotsInProgress.isNodeIdForRemoval(shardRouting.currentNodeId()) == false - || shardRouting.unassigned()) { - return true; - } - } - } - } - } - } - return false; - } - - private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { - if (removedNodes.isEmpty()) { - // Nothing to do, no nodes removed - return false; - } - final Set removedNodeIds = removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - return snapshotsInProgress.asStream().anyMatch(snapshot -> { - if (snapshot.state().completed() || snapshot.isClone()) { - // nothing to do for already completed snapshots or clones that run on master anyways - return false; - } - for (ShardSnapshotStatus shardSnapshotStatus : snapshot.shardSnapshotStatusByRepoShardId().values()) { - if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) { - // Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status - return true; - } - } - return false; - }); - } - /** * Finalizes the snapshot in the repository. * @@ -1513,7 +1104,7 @@ protected void doRun() { SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot); final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); - final var updatedShardGenerations = buildGenerations(entry, metadata); + final var updatedShardGenerations = SnapshotsServiceUtils.buildGenerations(entry, metadata); final ShardGenerations updatedShardGensForLiveIndices = updatedShardGenerations.liveIndices(); final List finalIndices = updatedShardGensForLiveIndices.indices().stream().map(IndexId::getName).toList(); final Set indexNames = new HashSet<>(finalIndices); @@ -1556,7 +1147,7 @@ protected void doRun() { dataStreamsToCopy.put(dataStreamEntry.getKey(), dataStreamEntry.getValue()); } } - Map dataStreamAliasesToCopy = filterDataStreamAliases( + Map dataStreamAliasesToCopy = SnapshotsServiceUtils.filterDataStreamAliases( dataStreamsToCopy, existingProject.dataStreamAliases() ); @@ -1609,7 +1200,7 @@ protected void doRun() { snapshot, finalIndices, entry.dataStreams().stream().filter(metaForSnapshot.getProject(projectId).dataStreams()::containsKey).toList(), - entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(), + entry.partial() ? SnapshotsServiceUtils.onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(), failure, threadPool.absoluteTimeInMillis(), entry.partial() ? updatedShardGensForLiveIndices.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(), @@ -1646,7 +1237,7 @@ protected void doRun() { () -> snapshotListeners.addListener(new ActionListener<>() { @Override public void onResponse(List> actionListeners) { - completeListenersIgnoringException(actionListeners, snapshotInfo); + SnapshotsServiceUtils.completeListenersIgnoringException(actionListeners, snapshotInfo); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); } @@ -1688,32 +1279,6 @@ public void onFailure(Exception e) { } } - /** - * Removes all feature states which have missing or failed shards, as they are no longer safely restorable. - * @param entry The "in progress" entry with a list of feature states and one or more failed shards. - * @param finalIndices The final list of indices in the snapshot, after any indices that were concurrently deleted are removed. - * @return The list of feature states which were completed successfully in the given entry. - */ - private static List onlySuccessfulFeatureStates(SnapshotsInProgress.Entry entry, List finalIndices) { - assert entry.partial() : "should not try to filter feature states from a non-partial entry"; - - // Figure out which indices have unsuccessful shards - Set indicesWithUnsuccessfulShards = new HashSet<>(); - entry.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { - final ShardState shardState = value.state(); - if (shardState.failed() || shardState.completed() == false) { - indicesWithUnsuccessfulShards.add(key.indexName()); - } - }); - - // Now remove any feature states which contain any of those indices, as the feature state is not intact and not safely restorable - return entry.featureStates() - .stream() - .filter(stateInfo -> finalIndices.containsAll(stateInfo.getIndices())) - .filter(stateInfo -> stateInfo.getIndices().stream().anyMatch(indicesWithUnsuccessfulShards::contains) == false) - .toList(); - } - /** * Remove a snapshot from {@link #endingSnapshots} set and return its completion listeners that must be resolved. */ @@ -1794,7 +1359,7 @@ private void runReadyDeletions(RepositoryData repositoryData, ProjectId projectI @Override public ClusterState execute(ClusterState currentState) { - assert readyDeletions(currentState, projectId).v1() == currentState + assert SnapshotsServiceUtils.readyDeletions(currentState, projectId).v1() == currentState : "Deletes should have been set to ready by finished snapshot deletes and finalizations"; for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(currentState).getEntries()) { if (entry.projectId().equals(projectId) @@ -1824,239 +1389,6 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) }); } - /** - * Finds snapshot delete operations that are ready to execute in the given {@link ClusterState} and computes a new cluster state that - * has all executable deletes marked as executing. Returns a {@link Tuple} of the updated cluster state and all executable deletes. - * This can either be {@link SnapshotDeletionsInProgress.Entry} that were already in state - * {@link SnapshotDeletionsInProgress.State#STARTED} or waiting entries in state {@link SnapshotDeletionsInProgress.State#WAITING} - * that were moved to {@link SnapshotDeletionsInProgress.State#STARTED} in the returned updated cluster state. - * - * @param projectId the project for repositories where deletions should be prepared. {@code null} means all projects - * @param currentState current cluster state - * @return tuple of an updated cluster state and currently executable snapshot delete operations - */ - private static Tuple> readyDeletions( - ClusterState currentState, - @Nullable ProjectId projectId - ) { - final SnapshotDeletionsInProgress deletions = SnapshotDeletionsInProgress.get(currentState); - if (deletions.hasDeletionsInProgress() == false || (projectId != null && deletions.hasDeletionsInProgress(projectId) == false)) { - return Tuple.tuple(currentState, List.of()); - } - final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE); - assert snapshotsInProgress != null; - final Set repositoriesSeen = new HashSet<>(); - boolean changed = false; - final ArrayList readyDeletions = new ArrayList<>(); - final List newDeletes = new ArrayList<>(); - for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { - if (projectId != null && projectId.equals(entry.projectId()) == false) { - // Not the target project, keep the entry as is - newDeletes.add(entry); - continue; - } - final var projectRepo = new ProjectRepo(entry.projectId(), entry.repository()); - if (repositoriesSeen.add(projectRepo) - && entry.state() == SnapshotDeletionsInProgress.State.WAITING - && snapshotsInProgress.forRepo(projectRepo).stream().noneMatch(SnapshotsService::isWritingToRepository)) { - changed = true; - final SnapshotDeletionsInProgress.Entry newEntry = entry.started(); - readyDeletions.add(newEntry); - newDeletes.add(newEntry); - } else { - newDeletes.add(entry); - } - } - return Tuple.tuple( - changed - ? ClusterState.builder(currentState) - .putCustom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.of(newDeletes)) - .build() - : currentState, - readyDeletions - ); - } - - /** - * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update - * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an - * outdated shard generation. - *

- * For example, shard snapshot X can be taken, but not finalized yet. Shard snapshot Y can then depend upon shard snapshot X. Then shard - * snapshot Y may finalize before shard snapshot X, but including X. However, X does not include Y. Therefore we update X to use Y's - * shard generation file (list of snapshots and dependencies) to avoid overwriting with X's file that is missing Y. - * - * @param state current cluster state - * @param snapshot snapshot for which to remove the snapshot operation - * @return updated cluster state - */ - public static ClusterState stateWithoutSnapshot( - ClusterState state, - Snapshot snapshot, - UpdatedShardGenerations updatedShardGenerations - ) { - final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state); - ClusterState result = state; - int indexOfEntry = -1; - // Find the in-progress snapshot entry that matches {@code snapshot}. - final ProjectId projectId = snapshot.getProjectId(); - final String repository = snapshot.getRepository(); - final List entryList = inProgressSnapshots.forRepo(projectId, repository); - for (int i = 0; i < entryList.size(); i++) { - SnapshotsInProgress.Entry entry = entryList.get(i); - if (entry.snapshot().equals(snapshot)) { - indexOfEntry = i; - break; - } - } - if (indexOfEntry >= 0) { - final List updatedEntries = new ArrayList<>(entryList.size() - 1); - final SnapshotsInProgress.Entry removedEntry = entryList.get(indexOfEntry); - for (int i = 0; i < indexOfEntry; i++) { - final SnapshotsInProgress.Entry previousEntry = entryList.get(i); - if (removedEntry.isClone()) { - if (previousEntry.isClone()) { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - if (shardState.state() == ShardState.SUCCESS) { - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - finishedShardEntry.getKey(), - previousEntry.shardSnapshotStatusByRepoShardId() - ); - } - } - addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); - } else { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); - if (shardState.state() != ShardState.SUCCESS - || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false) { - continue; - } - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - previousEntry.shardId(repositoryShardId), - previousEntry.shards() - ); - - } - addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); - } - } else { - if (previousEntry.isClone()) { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); - if (shardState.state() != ShardState.SUCCESS - || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false - || updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { - continue; - } - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - repositoryShardId, - previousEntry.shardSnapshotStatusByRepoShardId() - ); - } - addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); - } else { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - if (shardState.state() == ShardState.SUCCESS - && previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey()) - && updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) { - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - previousEntry.shardId(finishedShardEntry.getKey()), - previousEntry.shards() - ); - } - } - addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); - } - } - } - for (int i = indexOfEntry + 1; i < entryList.size(); i++) { - updatedEntries.add(entryList.get(i)); - } - result = ClusterState.builder(state) - .putCustom( - SnapshotsInProgress.TYPE, - inProgressSnapshots.createCopyWithUpdatedEntriesForRepo(projectId, repository, updatedEntries) - ) - .build(); - } - return readyDeletions(result, projectId).v1(); - } - - private static void addSnapshotEntry( - List entries, - SnapshotsInProgress.Entry entryToUpdate, - @Nullable ImmutableOpenMap.Builder updatedShardAssignments - ) { - if (updatedShardAssignments == null) { - entries.add(entryToUpdate); - } else { - final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder(entryToUpdate.shards()); - updatedStatus.putAllFromMap(updatedShardAssignments.build()); - entries.add(entryToUpdate.withShardStates(updatedStatus.build())); - } - } - - private static void addCloneEntry( - List entries, - SnapshotsInProgress.Entry entryToUpdate, - @Nullable ImmutableOpenMap.Builder updatedShardAssignments - ) { - if (updatedShardAssignments == null) { - entries.add(entryToUpdate); - } else { - final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder( - entryToUpdate.shardSnapshotStatusByRepoShardId() - ); - updatedStatus.putAllFromMap(updatedShardAssignments.build()); - entries.add(entryToUpdate.withClones(updatedStatus.build())); - } - } - - @Nullable - private static ImmutableOpenMap.Builder maybeAddUpdatedAssignment( - @Nullable ImmutableOpenMap.Builder updatedShardAssignments, - ShardSnapshotStatus finishedShardState, - T shardId, - Map statesToUpdate - ) { - final ShardGeneration newGeneration = finishedShardState.generation(); - final ShardSnapshotStatus stateToUpdate = statesToUpdate.get(shardId); - if (stateToUpdate != null - && stateToUpdate.state() == ShardState.SUCCESS - && Objects.equals(newGeneration, stateToUpdate.generation()) == false) { - if (updatedShardAssignments == null) { - updatedShardAssignments = ImmutableOpenMap.builder(); - } - updatedShardAssignments.put(shardId, stateToUpdate.withUpdatedGeneration(newGeneration)); - } - return updatedShardAssignments; - } - /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the @@ -2078,14 +1410,18 @@ private void removeFailedSnapshotFromClusterState( @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, updatedShardGenerations); + final ClusterState updatedState = SnapshotsServiceUtils.stateWithoutSnapshot( + currentState, + snapshot, + updatedShardGenerations + ); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them - return updateWithSnapshots( + return SnapshotsServiceUtils.updateWithSnapshots( updatedState, null, - deletionsWithoutSnapshots( + SnapshotsServiceUtils.deletionsWithoutSnapshots( SnapshotDeletionsInProgress.get(updatedState), Collections.singletonList(snapshot.getSnapshotId()), snapshot.getProjectId(), @@ -2124,44 +1460,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) private static final String REMOVE_SNAPSHOT_METADATA_TASK_SOURCE = "remove snapshot metadata"; - /** - * Remove the given {@link SnapshotId}s for the given {@code repository} from an instance of {@link SnapshotDeletionsInProgress}. - * If no deletion contained any of the snapshot ids to remove then return {@code null}. - * - * @param deletions snapshot deletions to update - * @param snapshotIds snapshot ids to remove - * @param projectId project for the repository - * @param repository repository that the snapshot ids belong to - * @return updated {@link SnapshotDeletionsInProgress} or {@code null} if unchanged - */ - @Nullable - private static SnapshotDeletionsInProgress deletionsWithoutSnapshots( - SnapshotDeletionsInProgress deletions, - Collection snapshotIds, - ProjectId projectId, - String repository - ) { - boolean changed = false; - List updatedEntries = new ArrayList<>(deletions.getEntries().size()); - for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { - if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { - final List updatedSnapshotIds = new ArrayList<>(entry.snapshots()); - if (updatedSnapshotIds.removeAll(snapshotIds)) { - changed = true; - updatedEntries.add(entry.withSnapshots(updatedSnapshotIds)); - } else { - updatedEntries.add(entry); - } - } else { - updatedEntries.add(entry); - } - } - return changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null; - } - private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e, Consumer failingListenersConsumer) { final List> listeners = endAndGetListenersToResolve(snapshot); - failingListenersConsumer.accept(() -> failListenersIgnoringException(listeners, e)); + failingListenersConsumer.accept(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, e)); assert repositoryOperations.assertNotQueued(snapshot); } @@ -2197,7 +1498,7 @@ public void deleteSnapshots(final ProjectId projectId, final DeleteSnapshotReque @Override public ClusterState execute(ClusterState currentState) { final var projectMetadata = currentState.metadata().getProject(projectId); - ensureRepositoryExists(repositoryName, projectMetadata); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, projectMetadata); final Set snapshotIds = new HashSet<>(); // find in-progress snapshots to delete in cluster state @@ -2251,14 +1552,14 @@ public ClusterState execute(ClusterState currentState) { } } - ensureNoCleanupInProgress( + SnapshotsServiceUtils.ensureNoCleanupInProgress( currentState, repositoryName, snapshotIds.stream().findFirst().get().getName(), "delete snapshot" ); - ensureNotReadOnly(projectMetadata, repositoryName); + SnapshotsServiceUtils.ensureNotReadOnly(projectMetadata, repositoryName); final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState); @@ -2304,7 +1605,7 @@ public ClusterState execute(ClusterState currentState) { ); if (snapshotIdsRequiringCleanup.isEmpty()) { // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions - return updateWithSnapshots(currentState, updatedSnapshots, null); + return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshots, null); } // add the snapshot deletion to the cluster state final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() @@ -2334,7 +1635,7 @@ public ClusterState execute(ClusterState currentState) { List.copyOf(snapshotIdsRequiringCleanup), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - updatedSnapshots.forRepo(projectId, repositoryName).stream().noneMatch(SnapshotsService::isWritingToRepository) + updatedSnapshots.forRepo(projectId, repositoryName).stream().noneMatch(SnapshotsServiceUtils::isWritingToRepository) && deletionsInProgress.hasExecutingDeletion(projectId, repositoryName) == false ? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING @@ -2342,7 +1643,7 @@ public ClusterState execute(ClusterState currentState) { } else { newDelete = replacedEntry.withAddedSnapshots(snapshotIdsRequiringCleanup); } - return updateWithSnapshots( + return SnapshotsServiceUtils.updateWithSnapshots( currentState, updatedSnapshots, (replacedEntry == null ? deletionsInProgress : deletionsInProgress.withRemovedEntry(replacedEntry.uuid())) @@ -2410,104 +1711,11 @@ public String toString() { }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure); } - /** - * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository. - * - * @param entry snapshot entry - * @return true if entry is currently writing to the repository - */ - private static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { - if (entry.state().completed()) { - // Entry is writing to the repo because it's finalizing on master - return true; - } - for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { - if (value.isActive()) { - // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard - return true; - } - } - return false; - } - private void addDeleteListener(String deleteUUID, ActionListener listener) { snapshotDeletionListeners.computeIfAbsent(deleteUUID, k -> new CopyOnWriteArrayList<>()) .add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext())); } - /** - * Determines the minimum {@link IndexVersion} that the snapshot repository must be compatible with - * from the current nodes in the cluster and the contents of the repository. - * The minimum version is determined as the lowest version found across all snapshots in the - * repository and all nodes in the cluster. - * - * @param minNodeVersion minimum node version in the cluster - * @param repositoryData current {@link RepositoryData} of that repository - * @param excluded snapshot id to ignore when computing the minimum version - * (used to use newer metadata version after a snapshot delete) - * @return minimum node version that must still be able to read the repository metadata - */ - public static IndexVersion minCompatibleVersion( - IndexVersion minNodeVersion, - RepositoryData repositoryData, - @Nullable Collection excluded - ) { - IndexVersion minCompatVersion = minNodeVersion; - final Collection snapshotIds = repositoryData.getSnapshotIds(); - for (SnapshotId snapshotId : snapshotIds.stream() - .filter(excluded == null ? Predicates.always() : Predicate.not(excluded::contains)) - .toList()) { - final IndexVersion known = repositoryData.getVersion(snapshotId); - // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs - if (known == null) { - assert repositoryData.shardGenerations().totalShards() == 0 - : "Saw shard generations [" - + repositoryData.shardGenerations() - + "] but did not have versions tracked for snapshot [" - + snapshotId - + "]"; - return OLD_SNAPSHOT_FORMAT; - } else { - minCompatVersion = IndexVersion.min(minCompatVersion, known); - } - } - return minCompatVersion; - } - - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useShardGenerations(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); - } - - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useIndexGenerations(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); - } - - /** - * Checks whether the metadata version supports writing the cluster- and repository-uuid to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports writing cluster- and repository-uuid to the repository - */ - public static boolean includesUUIDs(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(UUIDS_IN_REPO_DATA_VERSION); - } - - public static boolean includeFileInfoWriterUUID(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION); - } - /** Deletes snapshot from repository * * @param deleteEntry delete entry in cluster state @@ -2634,7 +1842,7 @@ private void deleteSnapshotsFromRepository( removeSnapshotDeletionFromClusterState( deleteEntry, repositoryData, - listeners -> completeListenersIgnoringException(listeners, null) + listeners -> SnapshotsServiceUtils.completeListenersIgnoringException(listeners, null) ); return; } @@ -2649,7 +1857,7 @@ public void onResponse(RepositoryData updatedRepoData) { listeners -> doneFuture.addListener(new ActionListener<>() { @Override public void onResponse(Void unused) { - completeListenersIgnoringException(listeners, null); + SnapshotsServiceUtils.completeListenersIgnoringException(listeners, null); } @Override @@ -2678,7 +1886,7 @@ public void onFailure(Exception e) { new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) { @Override protected void handleListeners(List> deleteListeners) { - failListenersIgnoringException(deleteListeners, e); + SnapshotsServiceUtils.failListenersIgnoringException(deleteListeners, e); } } ); @@ -2718,7 +1926,7 @@ private void removeSnapshotDeletionFromClusterState( submitUnbatchedTask("remove snapshot deletion metadata", new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) { @Override protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgress deletions) { - final SnapshotDeletionsInProgress updatedDeletions = deletionsWithoutSnapshots( + final SnapshotDeletionsInProgress updatedDeletions = SnapshotsServiceUtils.deletionsWithoutSnapshots( deletions, deleteEntry.snapshots(), deleteEntry.projectId(), @@ -2763,7 +1971,7 @@ private void failAllListenersOnMasterFailOver(Exception e) { final Exception wrapped = new RepositoryException("_all", "Failed to update cluster state during repository operation", e); for (final Iterator>> it = snapshotDeletionListeners.values().iterator(); it.hasNext();) { final List> listeners = it.next(); - readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, wrapped)); + readyToResolveListeners.add(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, wrapped)); it.remove(); } assert snapshotDeletionListeners.isEmpty() : "No new listeners should have been added but saw " + snapshotDeletionListeners; @@ -2808,8 +2016,12 @@ public ClusterState execute(ClusterState currentState) { return currentState; } final SnapshotDeletionsInProgress newDeletions = filterDeletions(updatedDeletions); - final Tuple> res = readyDeletions( - updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions), + final Tuple> res = SnapshotsServiceUtils.readyDeletions( + SnapshotsServiceUtils.updateWithSnapshots( + currentState, + updatedSnapshotsInProgress(currentState, newDeletions), + newDeletions + ), deleteEntry.projectId() ); readyDeletions = res.v2(); @@ -2969,7 +2181,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState // No shards can be updated in this snapshot so we just add it as is again snapshotEntries.add(entry); } else { - final ImmutableOpenMap shardAssignments = shards( + final ImmutableOpenMap shardAssignments = SnapshotsServiceUtils.shards( snapshotsInProgress, updatedDeletions, currentState.projectState(projectId), @@ -3031,217 +2243,6 @@ public String toString() { } } - /** - * Shortcut to build new {@link ClusterState} from the current state and updated values of {@link SnapshotsInProgress} and - * {@link SnapshotDeletionsInProgress}. - * - * @param state current cluster state - * @param snapshotsInProgress new value for {@link SnapshotsInProgress} or {@code null} if it's unchanged - * @param snapshotDeletionsInProgress new value for {@link SnapshotDeletionsInProgress} or {@code null} if it's unchanged - * @return updated cluster state - */ - public static ClusterState updateWithSnapshots( - ClusterState state, - @Nullable SnapshotsInProgress snapshotsInProgress, - @Nullable SnapshotDeletionsInProgress snapshotDeletionsInProgress - ) { - if (snapshotsInProgress == null && snapshotDeletionsInProgress == null) { - return state; - } - ClusterState.Builder builder = ClusterState.builder(state); - if (snapshotsInProgress != null) { - builder.putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress); - } - if (snapshotDeletionsInProgress != null) { - builder.putCustom(SnapshotDeletionsInProgress.TYPE, snapshotDeletionsInProgress); - } - return builder.build(); - } - - private static void failListenersIgnoringException(@Nullable List> listeners, Exception failure) { - if (listeners != null) { - try { - ActionListener.onFailure(listeners, failure); - } catch (Exception ex) { - assert false : new AssertionError(ex); - logger.warn("Failed to notify listeners", ex); - } - } - } - - private static void completeListenersIgnoringException(@Nullable List> listeners, T result) { - if (listeners != null) { - try { - ActionListener.onResponse(listeners, result); - } catch (Exception ex) { - assert false : new AssertionError(ex); - logger.warn("Failed to notify listeners", ex); - } - } - } - - /** - * Calculates the assignment of shards to data nodes for a new snapshot based on the given cluster state and the - * indices that should be included in the snapshot. - * - * @param indices Indices to snapshot - * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot - * @return map of shard-id to snapshot-status of all shards included into current snapshot - */ - private static ImmutableOpenMap shards( - SnapshotsInProgress snapshotsInProgress, - SnapshotDeletionsInProgress deletionsInProgress, - ProjectState currentState, - Collection indices, - boolean useShardGenerations, - RepositoryData repositoryData, - String repoName - ) { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forEntries( - snapshotsInProgress.forRepo(currentState.projectId(), repoName) - ); - final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(currentState.projectId(), repoName) == false; - for (IndexId index : indices) { - final String indexName = index.getName(); - final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; - IndexMetadata indexMetadata = currentState.metadata().index(indexName); - if (indexMetadata == null) { - // The index was deleted before we managed to start the snapshot - mark it as missing. - builder.put(new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0), ShardSnapshotStatus.MISSING); - } else { - final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(indexName); - assert indexRoutingTable != null; - for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - final ShardId shardId = indexRoutingTable.shard(i).shardId(); - final ShardGeneration shardRepoGeneration; - if (useShardGenerations) { - final ShardGeneration inFlightGeneration = inFlightShardStates.generationForShard( - index, - shardId.id(), - shardGenerations - ); - if (inFlightGeneration == null && isNewIndex) { - assert shardGenerations.getShardGen(index, shardId.getId()) == null - : "Found shard generation for new index [" + index + "]"; - shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; - } else { - shardRepoGeneration = inFlightGeneration; - } - } else { - shardRepoGeneration = null; - } - final ShardSnapshotStatus shardSnapshotStatus; - if (readyToExecute == false || inFlightShardStates.isActive(shardId.getIndexName(), shardId.id())) { - shardSnapshotStatus = ShardSnapshotStatus.UNASSIGNED_QUEUED; - } else { - shardSnapshotStatus = initShardSnapshotStatus( - shardRepoGeneration, - indexRoutingTable.shard(i).primaryShard(), - snapshotsInProgress::isNodeIdForRemoval - ); - } - builder.put(shardId, shardSnapshotStatus); - } - } - } - - return builder.build(); - } - - /** - * Compute the snapshot status for a given shard based on the current primary routing entry for the shard. - * - * @param shardRepoGeneration repository generation of the shard in the repository - * @param primary primary routing entry for the shard - * @param nodeIdRemovalPredicate tests whether a node ID is currently marked for removal from the cluster - * @return shard snapshot status - */ - private static ShardSnapshotStatus initShardSnapshotStatus( - ShardGeneration shardRepoGeneration, - ShardRouting primary, - Predicate nodeIdRemovalPredicate - ) { - ShardSnapshotStatus shardSnapshotStatus; - if (primary == null || primary.assignedToNode() == false) { - shardSnapshotStatus = new ShardSnapshotStatus(null, ShardState.MISSING, shardRepoGeneration, "primary shard is not allocated"); - } else if (primary.relocating() || primary.initializing()) { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration); - } else if (nodeIdRemovalPredicate.test(primary.currentNodeId())) { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.PAUSED_FOR_NODE_REMOVAL, shardRepoGeneration); - } else if (primary.started() == false) { - shardSnapshotStatus = new ShardSnapshotStatus( - primary.currentNodeId(), - ShardState.MISSING, - shardRepoGeneration, - "primary shard hasn't been started yet" - ); - } else { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); - } - return shardSnapshotStatus; - } - - /** - * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the - * indices-to-check set. - */ - public static Set snapshottingDataStreams(final ProjectState projectState, final Set dataStreamsToCheck) { - Map dataStreams = projectState.metadata().dataStreams(); - return SnapshotsInProgress.get(projectState.cluster()) - .asStream(projectState.projectId()) - .filter(e -> e.partial() == false) - .flatMap(e -> e.dataStreams().stream()) - .filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds)) - .collect(Collectors.toSet()); - } - - /** - * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set. - */ - public static Set snapshottingIndices(final ProjectState projectState, final Set indicesToCheck) { - final Set indices = new HashSet<>(); - for (List snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster()) - .entriesByRepo(projectState.projectId())) { - for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) { - if (entry.partial() == false && entry.isClone() == false) { - for (String indexName : entry.indices().keySet()) { - IndexMetadata indexMetadata = projectState.metadata().index(indexName); - if (indexMetadata != null && indicesToCheck.contains(indexMetadata.getIndex())) { - indices.add(indexMetadata.getIndex()); - } - } - } - } - } - return indices; - } - - /** - * Filters out the aliases that refer to data streams to do not exist in the provided data streams. - * Also rewrites the list of data streams an alias point to to only contain data streams that exist in the provided data streams. - * - * The purpose of this method is to capture the relevant data stream aliases based on the data streams - * that will be included in a snapshot. - * - * @param dataStreams The provided data streams, which will be included in a snapshot. - * @param dataStreamAliases The data streams aliases that may contain aliases that refer to data streams - * that don't exist in the provided data streams. - * @return The filtered data streams aliases only referring to data streams in the provided data streams. - */ - static Map filterDataStreamAliases( - Map dataStreams, - Map dataStreamAliases - ) { - - return dataStreamAliases.values() - .stream() - .filter(alias -> alias.getDataStreams().stream().anyMatch(dataStreams::containsKey)) - .map(alias -> alias.intersect(dataStreams::containsKey)) - .collect(Collectors.toMap(DataStreamAlias::getName, Function.identity())); - } - /** * Adds snapshot completion listener * @@ -3284,10 +2285,6 @@ public boolean assertAllListenersResolved() { return true; } - private static boolean isQueued(@Nullable ShardSnapshotStatus status) { - return status != null && status.state() == ShardState.QUEUED; - } - /** * State machine for updating existing {@link SnapshotsInProgress.Entry} by applying a given list of {@link ShardSnapshotUpdate} to * them. The algorithm implemented below works as described @@ -3394,7 +2391,9 @@ SnapshotsInProgress computeUpdatedState() { changedCount, startedCount ); - return supportsNodeRemovalTracking(initialState) ? updated.withUpdatedNodeIdsForRemoval(initialState) : updated; + return SnapshotsServiceUtils.supportsNodeRemovalTracking(initialState) + ? updated.withUpdatedNodeIdsForRemoval(initialState) + : updated; } return existing; } @@ -3638,7 +2637,7 @@ private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, Sh // start a shard snapshot or clone operation on the current entry if (entry.isClone() == false) { tryStartSnapshotAfterCloneFinish(repoShardId, updatedState.generation()); - } else if (isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { + } else if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { final String localNodeId = initialState.nodes().getLocalNodeId(); assert updatedState.nodeId().equals(localNodeId) : "Clone updated with node id [" + updatedState.nodeId() + "] but local node id is [" + localNodeId + "]"; @@ -3652,7 +2651,7 @@ private void tryStartNextTaskAfterSnapshotUpdated(ShardId shardId, ShardSnapshot final IndexId indexId = entry.indices().get(shardId.getIndexName()); if (indexId != null) { final RepositoryShardId repoShardId = new RepositoryShardId(indexId, shardId.id()); - if (isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { + if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { if (entry.isClone()) { // shard snapshot was completed, we check if we can start a clone operation for the same repo shard startShardOperation( @@ -3671,7 +2670,7 @@ private void tryStartNextTaskAfterSnapshotUpdated(ShardId shardId, ShardSnapshot private void tryStartSnapshotAfterCloneFinish(RepositoryShardId repoShardId, ShardGeneration generation) { assert entry.source() == null; // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id - if (isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { + if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { startShardSnapshot(repoShardId, generation); } } @@ -3693,7 +2692,11 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g } else { shardRouting = indexRouting.shard(repoShardId.shardId()).primaryShard(); } - final ShardSnapshotStatus shardSnapshotStatus = initShardSnapshotStatus(generation, shardRouting, nodeIdRemovalPredicate); + final ShardSnapshotStatus shardSnapshotStatus = SnapshotsServiceUtils.initShardSnapshotStatus( + generation, + shardRouting, + nodeIdRemovalPredicate + ); final ShardId routingShardId = shardRouting != null ? shardRouting.shardId() : new ShardId(index, repoShardId.shardId()); if (shardSnapshotStatus.isActive()) { startShardOperation(shardsBuilder(), routingShardId, shardSnapshotStatus); @@ -3988,7 +2991,7 @@ public ClusterState execute(ClusterState currentState) { final SnapshotsInProgress updatedSnapshotsInProgress = changedSnapshots ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectId, repository, List.of()) : null; - return updateWithSnapshots(currentState, updatedSnapshotsInProgress, updatedDeletions); + return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshotsInProgress, updatedDeletions); } @Override @@ -4022,7 +3025,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) } for (String delete : deletionsToFail) { final List> listeners = snapshotDeletionListeners.remove(delete); - readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, failure)); + readyToResolveListeners.add(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, failure)); repositoryOperations.finishDeletion(delete); } } @@ -4133,7 +3136,7 @@ private record CreateSnapshotTask( @Override public void onFailure(Exception e) { - logSnapshotFailure("create", snapshot, e); + SnapshotsServiceUtils.logSnapshotFailure("create", snapshot, e); listener.onFailure(e); } @@ -4143,51 +3146,6 @@ public String toString() { } } - private static void logSnapshotFailure(String operation, Snapshot snapshot, Exception e) { - final var logLevel = snapshotFailureLogLevel(e); - if (logLevel == Level.INFO && logger.isDebugEnabled() == false) { - // suppress stack trace at INFO unless extra verbosity is configured - logger.info( - format( - "%s[%s] failed to %s snapshot: %s", - projectRepoString(snapshot.getProjectId(), snapshot.getRepository()), - snapshot.getSnapshotId().getName(), - operation, - e.getMessage() - ) - ); - } else { - logger.log( - logLevel, - () -> format("[%s][%s] failed to %s snapshot", snapshot.getRepository(), snapshot.getSnapshotId().getName(), operation), - e - ); - } - } - - private static Level snapshotFailureLogLevel(Exception e) { - if (MasterService.isPublishFailureException(e)) { - // no action needed, the new master will take things from here - return Level.INFO; - } else if (e instanceof InvalidSnapshotNameException) { - // no action needed, typically ILM-related, or a user error - return Level.INFO; - } else if (e instanceof IndexNotFoundException) { - // not worrying, most likely a user error - return Level.INFO; - } else if (e instanceof SnapshotException) { - if (e.getMessage().contains(ReferenceDocs.UNASSIGNED_SHARDS.toString())) { - // non-partial snapshot requested but cluster health is not yellow or green; the health is tracked elsewhere so no need to - // make more noise here - return Level.INFO; - } - } else if (e instanceof IllegalArgumentException) { - // some other user error - return Level.INFO; - } - return Level.WARN; - } - private class SnapshotTaskExecutor implements ClusterStateTaskExecutor { @Override public ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception { @@ -4260,13 +3218,13 @@ private SnapshotsInProgress createSnapshot( final String repositoryName = snapshot.getRepository(); final String snapshotName = snapshot.getSnapshotId().getName(); final var projectState = currentState.projectState(snapshot.getProjectId()); - ensureRepositoryExists(repositoryName, projectState.metadata()); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, projectState.metadata()); final Repository repository = createSnapshotTask.repository; - ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); - ensureSnapshotNameNotRunning(snapshotsInProgress, snapshot.getProjectId(), repositoryName, snapshotName); - validate(repositoryName, snapshotName, projectState.metadata()); + SnapshotsServiceUtils.ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + SnapshotsServiceUtils.ensureSnapshotNameNotRunning(snapshotsInProgress, snapshot.getProjectId(), repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName, projectState.metadata()); final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState); - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot"); + SnapshotsServiceUtils.ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot"); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress); final CreateSnapshotRequest request = createSnapshotTask.createSnapshotRequest; @@ -4375,7 +3333,7 @@ private SnapshotsInProgress createSnapshot( allIndices.putAll(runningSnapshot.indices()); } final Map indexIds = repositoryData.resolveNewIndices(indices, allIndices); - final IndexVersion version = minCompatibleVersion( + final IndexVersion version = SnapshotsServiceUtils.minCompatibleVersion( // NB minCompatibleVersion iterates over all the snapshots in the current repositoryData, which probably should happen on a // different thread. Also is the _current_ repositoryData the right thing to consider? The minimum repository format version // can only advance during a snapshot delete which today is never concurrent to other writes, but a future version may allow @@ -4384,12 +3342,12 @@ private SnapshotsInProgress createSnapshot( repositoryData, null ); - ImmutableOpenMap shards = shards( + ImmutableOpenMap shards = SnapshotsServiceUtils.shards( snapshotsInProgress, deletionsInProgress, projectState, indexIds.values(), - useShardGenerations(version), + SnapshotsServiceUtils.useShardGenerations(version), repositoryData, repositoryName ); @@ -4451,7 +3409,7 @@ static ClusterState executeBatch( } final var clusterState = batchExecutionContext.initialState(); - if (supportsNodeRemovalTracking(clusterState)) { + if (SnapshotsServiceUtils.supportsNodeRemovalTracking(clusterState)) { final var snapshotsInProgress = SnapshotsInProgress.get(clusterState); final var newSnapshotsInProgress = snapshotsInProgress.withUpdatedNodeIdsForRemoval(clusterState); if (newSnapshotsInProgress != snapshotsInProgress) { @@ -4462,9 +3420,5 @@ static ClusterState executeBatch( } } - private static boolean supportsNodeRemovalTracking(ClusterState clusterState) { - return clusterState.getMinTransportVersion().onOrAfter(TransportVersions.V_8_13_0); - } - private final MasterServiceTaskQueue updateNodeIdsToRemoveQueue; } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java new file mode 100644 index 0000000000000..048c3f86d5613 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java @@ -0,0 +1,1172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamAlias; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Predicates; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.FinalizeSnapshotContext; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.ProjectRepo; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.RepositoryShardId; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.ShardGenerations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Collections.unmodifiableList; +import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.repositories.ProjectRepo.projectRepoString; +import static org.elasticsearch.snapshots.SnapshotsService.FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION; +import static org.elasticsearch.snapshots.SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION; +import static org.elasticsearch.snapshots.SnapshotsService.OLD_SNAPSHOT_FORMAT; +import static org.elasticsearch.snapshots.SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION; +import static org.elasticsearch.snapshots.SnapshotsService.UUIDS_IN_REPO_DATA_VERSION; + +/** + * A utility class for static snapshotting methods. + */ +public class SnapshotsServiceUtils { + private static final Logger logger = LogManager.getLogger(SnapshotsServiceUtils.class); + + public static void ensureSnapshotNameNotRunning( + SnapshotsInProgress runningSnapshots, + ProjectId projectId, + String repositoryName, + String snapshotName + ) { + if (runningSnapshots.forRepo(projectId, repositoryName) + .stream() + .anyMatch(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName))) { + throw new SnapshotNameAlreadyInUseException(repositoryName, snapshotName, "snapshot with the same name is already in-progress"); + } + } + + /** + * Checks the cluster state for any in-progress repository cleanup tasks ({@link RepositoryCleanupInProgress}). + * Note that repository cleanup is intentionally cluster wide exclusive. + */ + public static void ensureNoCleanupInProgress( + final ClusterState currentState, + final String repositoryName, + final String snapshotName, + final String reason + ) { + final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState); + if (repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot " + + reason + + " while a repository cleanup is in-progress in " + + repositoryCleanupInProgress.entries() + .stream() + .map(RepositoryCleanupInProgress.Entry::repository) + .collect(Collectors.toSet()) + ); + } + } + + public static void ensureNotReadOnly(final ProjectMetadata projectMetadata, final String repositoryName) { + final var repositoryMetadata = RepositoriesMetadata.get(projectMetadata).repository(repositoryName); + if (RepositoriesService.isReadOnly(repositoryMetadata.settings())) { + throw new RepositoryException(repositoryMetadata.name(), "repository is readonly"); + } + } + + public static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) { + // check if the snapshot name already exists in the repository + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new SnapshotNameAlreadyInUseException( + repository.getMetadata().name(), + snapshotName, + "snapshot with the same name already exists" + ); + } + } + + /** + * Throws {@link RepositoryMissingException} if no repository by the given name is found in the given cluster state. + */ + public static void ensureRepositoryExists(String repoName, ProjectMetadata projectMetadata) { + if (RepositoriesMetadata.get(projectMetadata).repository(repoName) == null) { + throw new RepositoryMissingException(repoName); + } + } + + /** + * Validates snapshot request + * + * @param repositoryName repository name + * @param snapshotName snapshot name + * @param projectMetadata current project metadata + */ + public static void validate(String repositoryName, String snapshotName, ProjectMetadata projectMetadata) { + if (RepositoriesMetadata.get(projectMetadata).repository(repositoryName) == null) { + throw new RepositoryMissingException(repositoryName); + } + validate(repositoryName, snapshotName); + } + + public static void validate(final String repositoryName, final String snapshotName) { + if (Strings.hasLength(snapshotName) == false) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "cannot be empty"); + } + if (snapshotName.contains(" ")) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain whitespace"); + } + if (snapshotName.contains(",")) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain ','"); + } + if (snapshotName.contains("#")) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain '#'"); + } + if (snapshotName.charAt(0) == '_') { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not start with '_'"); + } + if (snapshotName.toLowerCase(Locale.ROOT).equals(snapshotName) == false) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must be lowercase"); + } + if (Strings.validFileName(snapshotName) == false) { + throw new InvalidSnapshotNameException( + repositoryName, + snapshotName, + "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS + ); + } + } + + /** + * Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it + * to be assigned + */ + public static boolean assertNoDanglingSnapshots(ClusterState state) { + final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state); + final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state); + final Set reposWithRunningDelete = snapshotDeletionsInProgress.getEntries() + .stream() + .filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.STARTED) + .map(entry -> new ProjectRepo(entry.projectId(), entry.repository())) + .collect(Collectors.toSet()); + for (List repoEntry : snapshotsInProgress.entriesByRepo()) { + final SnapshotsInProgress.Entry entry = repoEntry.get(0); + for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { + if (value.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) { + assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) + : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete"; + } else if (value.isActive()) { + assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) == false + : "Found shard snapshot actively executing in [" + + entry + + "] when it should be blocked by a running delete [" + + Strings.toString(snapshotDeletionsInProgress) + + "]"; + } + } + } + return true; + } + + /** + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useShardGenerations(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); + } + + /** + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useIndexGenerations(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); + } + + /** + * Checks whether the metadata version supports writing the cluster- and repository-uuid to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports writing cluster- and repository-uuid to the repository + */ + public static boolean includesUUIDs(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(UUIDS_IN_REPO_DATA_VERSION); + } + + public static boolean includeFileInfoWriterUUID(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION); + } + + public static boolean supportsNodeRemovalTracking(ClusterState clusterState) { + return clusterState.getMinTransportVersion().onOrAfter(TransportVersions.V_8_13_0); + } + + /** + * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository. + * + * @param entry snapshot entry + * @return true if entry is currently writing to the repository + */ + public static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { + if (entry.state().completed()) { + // Entry is writing to the repo because it's finalizing on master + return true; + } + for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { + if (value.isActive()) { + // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard + return true; + } + } + return false; + } + + public static boolean isQueued(@Nullable SnapshotsInProgress.ShardSnapshotStatus status) { + return status != null && status.state() == SnapshotsInProgress.ShardState.QUEUED; + } + + public static FinalizeSnapshotContext.UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + ShardGenerations.Builder deletedBuilder = null; + if (snapshot.isClone()) { + snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value)); + } else { + for (Map.Entry entry : snapshot.shardSnapshotStatusByRepoShardId() + .entrySet()) { + RepositoryShardId key = entry.getKey(); + SnapshotsInProgress.ShardSnapshotStatus value = entry.getValue(); + final Index index = snapshot.indexByName(key.indexName()); + if (metadata.getProject(snapshot.projectId()).hasIndex(index) == false) { + assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; + if (deletedBuilder == null) { + deletedBuilder = ShardGenerations.builder(); + } + deletedBuilder.put(key.index(), key.shardId(), value); + continue; + } + builder.put(key.index(), key.shardId(), value); + } + } + return new FinalizeSnapshotContext.UpdatedShardGenerations( + builder.build(), + deletedBuilder == null ? ShardGenerations.EMPTY : deletedBuilder.build() + ); + } + + public static ProjectMetadata projectForSnapshot(SnapshotsInProgress.Entry snapshot, ProjectMetadata project) { + final ProjectMetadata.Builder builder; + if (snapshot.includeGlobalState() == false) { + // Create a new project state that only includes the index data + builder = ProjectMetadata.builder(project.id()); + for (IndexId index : snapshot.indices().values()) { + final IndexMetadata indexMetadata = project.index(index.getName()); + if (indexMetadata == null) { + assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; + } else { + builder.put(indexMetadata, false); + } + } + } else { + builder = ProjectMetadata.builder(project); + } + // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have + // all their indices contained in the snapshot + final Map dataStreams = new HashMap<>(); + final Set indicesInSnapshot = snapshot.indices().keySet(); + for (String dataStreamName : snapshot.dataStreams()) { + DataStream dataStream = project.dataStreams().get(dataStreamName); + if (dataStream == null) { + assert snapshot.partial() + : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; + } else { + final DataStream reconciled = dataStream.snapshot(indicesInSnapshot, builder); + if (reconciled != null) { + dataStreams.put(dataStreamName, reconciled); + } + } + } + return builder.dataStreams(dataStreams, filterDataStreamAliases(dataStreams, project.dataStreamAliases())).build(); + } + + /** + * Returns status of the currently running snapshots + *

+ * This method is executed on master node + *

+ * + * @param snapshotsInProgress snapshots in progress in the cluster state + * @param projectId project to look for the repository + * @param repository repository id + * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered + * @return list of metadata for currently running snapshots + */ + public static List currentSnapshots( + @Nullable SnapshotsInProgress snapshotsInProgress, + ProjectId projectId, + String repository, + List snapshots + ) { + if (snapshotsInProgress == null || snapshotsInProgress.isEmpty()) { + return Collections.emptyList(); + } + if ("_all".equals(repository)) { + return snapshotsInProgress.asStream(projectId).toList(); + } + if (snapshots.isEmpty()) { + return snapshotsInProgress.forRepo(projectId, repository); + } + List builder = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(projectId, repository)) { + for (String snapshot : snapshots) { + if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) { + builder.add(entry); + break; + } + } + } + return unmodifiableList(builder); + } + + /** + * Walks through the snapshot entries' shard snapshots and creates applies updates from looking at removed nodes or indexes and known + * failed shard snapshots on the same shard IDs. + * + * @param nodeIdRemovalPredicate identify any nodes that are marked for removal / in shutdown mode + * @param knownFailures already known failed shard snapshots, but more may be found in this method + * @return an updated map of shard statuses + */ + public static ImmutableOpenMap processWaitingShardsAndRemovedNodes( + SnapshotsInProgress.Entry snapshotEntry, + RoutingTable routingTable, + DiscoveryNodes nodes, + Predicate nodeIdRemovalPredicate, + Map knownFailures + ) { + assert snapshotEntry.isClone() == false : "clones take a different path"; + boolean snapshotChanged = false; + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + for (Map.Entry shardSnapshotEntry : snapshotEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + SnapshotsInProgress.ShardSnapshotStatus shardStatus = shardSnapshotEntry.getValue(); + ShardId shardId = snapshotEntry.shardId(shardSnapshotEntry.getKey()); + if (shardStatus.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) { + // this shard snapshot is waiting for a previous snapshot to finish execution for this shard + final SnapshotsInProgress.ShardSnapshotStatus knownFailure = knownFailures.get(shardSnapshotEntry.getKey()); + if (knownFailure == null) { + final IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); + if (indexShardRoutingTable == null) { + // shard became unassigned while queued after a delete or clone operation so we can fail as missing here + assert snapshotEntry.partial(); + snapshotChanged = true; + logger.debug("failing snapshot of shard [{}] because index got deleted", shardId); + shards.put(shardId, SnapshotsInProgress.ShardSnapshotStatus.MISSING); + knownFailures.put(shardSnapshotEntry.getKey(), SnapshotsInProgress.ShardSnapshotStatus.MISSING); + } else { + // if no failure is known for the shard we keep waiting + shards.put(shardId, shardStatus); + } + } else { + // If a failure is known for an execution we waited on for this shard then we fail with the same exception here + // as well + snapshotChanged = true; + shards.put(shardId, knownFailure); + } + } else if (shardStatus.state() == SnapshotsInProgress.ShardState.WAITING + || shardStatus.state() == SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { + // The shard primary wasn't assigned, or the shard snapshot was paused because the node was shutting down. + IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); + if (indexShardRoutingTable != null) { + IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id()); + if (shardRouting != null) { + final var primaryNodeId = shardRouting.primaryShard().currentNodeId(); + if (nodeIdRemovalPredicate.test(primaryNodeId)) { + if (shardStatus.state() == SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { + // Shard that we are waiting for is on a node marked for removal, keep it as PAUSED_FOR_REMOVAL + shards.put(shardId, shardStatus); + } else { + // Shard that we are waiting for is on a node marked for removal, move it to PAUSED_FOR_REMOVAL + snapshotChanged = true; + shards.put( + shardId, + new SnapshotsInProgress.ShardSnapshotStatus( + primaryNodeId, + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + shardStatus.generation() + ) + ); + } + continue; + } else if (shardRouting.primaryShard().started()) { + // Shard that we were waiting for has started on a node, let's process it + snapshotChanged = true; + logger.debug(""" + Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ + shard state [{}] + """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); + shards.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); + continue; + } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { + // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait + shards.put(shardId, shardStatus); + continue; + } + } + } + // Shard that we were waiting for went into unassigned state or disappeared (index or shard is gone) - giving up + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on node [{}] because shard is unassigned", shardId, shardStatus.nodeId()); + final SnapshotsInProgress.ShardSnapshotStatus failedState = new SnapshotsInProgress.ShardSnapshotStatus( + shardStatus.nodeId(), + SnapshotsInProgress.ShardState.FAILED, + shardStatus.generation(), + "shard is unassigned" + ); + shards.put(shardId, failedState); + knownFailures.put(shardSnapshotEntry.getKey(), failedState); + } else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) { + if (nodes.nodeExists(shardStatus.nodeId())) { + shards.put(shardId, shardStatus); + } else { + // TODO: Restart snapshot on another node? + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on departed node [{}]", shardId, shardStatus.nodeId()); + final SnapshotsInProgress.ShardSnapshotStatus failedState = new SnapshotsInProgress.ShardSnapshotStatus( + shardStatus.nodeId(), + SnapshotsInProgress.ShardState.FAILED, + shardStatus.generation(), + "node left the cluster during snapshot" + ); + shards.put(shardId, failedState); + knownFailures.put(shardSnapshotEntry.getKey(), failedState); + } + } else { + shards.put(shardId, shardStatus); + } + } + if (snapshotChanged) { + return shards.build(); + } else { + return null; + } + } + + public static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { + for (List entries : snapshotsInProgress.entriesByRepo()) { + for (SnapshotsInProgress.Entry entry : entries) { + if (entry.state() == SnapshotsInProgress.State.STARTED && entry.isClone() == false) { + final ProjectId projectId = entry.projectId(); + for (Map.Entry shardStatus : entry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardState state = shardStatus.getValue().state(); + if (state != SnapshotsInProgress.ShardState.WAITING + && state != SnapshotsInProgress.ShardState.QUEUED + && state != SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { + continue; + } + final RepositoryShardId shardId = shardStatus.getKey(); + final Index index = entry.indexByName(shardId.indexName()); + if (event.indexRoutingTableChanged(index)) { + IndexRoutingTable indexShardRoutingTable = event.state().routingTable(projectId).index(index); + if (indexShardRoutingTable == null) { + // index got removed concurrently and we have to fail WAITING, QUEUED and PAUSED_FOR_REMOVAL state shards + return true; + } + ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.shardId()).primaryShard(); + if (shardRouting.started() && snapshotsInProgress.isNodeIdForRemoval(shardRouting.currentNodeId()) == false + || shardRouting.unassigned()) { + return true; + } + } + } + } + } + } + return false; + } + + public static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { + if (removedNodes.isEmpty()) { + // Nothing to do, no nodes removed + return false; + } + final Set removedNodeIds = removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + return snapshotsInProgress.asStream().anyMatch(snapshot -> { + if (snapshot.state().completed() || snapshot.isClone()) { + // nothing to do for already completed snapshots or clones that run on master anyways + return false; + } + for (SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus : snapshot.shardSnapshotStatusByRepoShardId().values()) { + if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) { + // Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status + return true; + } + } + return false; + }); + } + + /** + * Removes all feature states which have missing or failed shards, as they are no longer safely restorable. + * @param entry The "in progress" entry with a list of feature states and one or more failed shards. + * @param finalIndices The final list of indices in the snapshot, after any indices that were concurrently deleted are removed. + * @return The list of feature states which were completed successfully in the given entry. + */ + public static List onlySuccessfulFeatureStates(SnapshotsInProgress.Entry entry, List finalIndices) { + assert entry.partial() : "should not try to filter feature states from a non-partial entry"; + + // Figure out which indices have unsuccessful shards + Set indicesWithUnsuccessfulShards = new HashSet<>(); + entry.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { + final SnapshotsInProgress.ShardState shardState = value.state(); + if (shardState.failed() || shardState.completed() == false) { + indicesWithUnsuccessfulShards.add(key.indexName()); + } + }); + + // Now remove any feature states which contain any of those indices, as the feature state is not intact and not safely restorable + return entry.featureStates() + .stream() + .filter(stateInfo -> finalIndices.containsAll(stateInfo.getIndices())) + .filter(stateInfo -> stateInfo.getIndices().stream().anyMatch(indicesWithUnsuccessfulShards::contains) == false) + .toList(); + } + + /** + * Finds snapshot delete operations that are ready to execute in the given {@link ClusterState} and computes a new cluster state that + * has all executable deletes marked as executing. Returns a {@link Tuple} of the updated cluster state and all executable deletes. + * This can either be {@link SnapshotDeletionsInProgress.Entry} that were already in state + * {@link SnapshotDeletionsInProgress.State#STARTED} or waiting entries in state {@link SnapshotDeletionsInProgress.State#WAITING} + * that were moved to {@link SnapshotDeletionsInProgress.State#STARTED} in the returned updated cluster state. + * + * @param projectId the project for repositories where deletions should be prepared. {@code null} means all projects + * @param currentState current cluster state + * @return tuple of an updated cluster state and currently executable snapshot delete operations + */ + public static Tuple> readyDeletions( + ClusterState currentState, + @Nullable ProjectId projectId + ) { + final SnapshotDeletionsInProgress deletions = SnapshotDeletionsInProgress.get(currentState); + if (deletions.hasDeletionsInProgress() == false || (projectId != null && deletions.hasDeletionsInProgress(projectId) == false)) { + return Tuple.tuple(currentState, List.of()); + } + final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE); + assert snapshotsInProgress != null; + final Set repositoriesSeen = new HashSet<>(); + boolean changed = false; + final ArrayList readyDeletions = new ArrayList<>(); + final List newDeletes = new ArrayList<>(); + for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { + if (projectId != null && projectId.equals(entry.projectId()) == false) { + // Not the target project, keep the entry as is + newDeletes.add(entry); + continue; + } + final var projectRepo = new ProjectRepo(entry.projectId(), entry.repository()); + if (repositoriesSeen.add(projectRepo) + && entry.state() == SnapshotDeletionsInProgress.State.WAITING + && snapshotsInProgress.forRepo(projectRepo).stream().noneMatch(SnapshotsServiceUtils::isWritingToRepository)) { + changed = true; + final SnapshotDeletionsInProgress.Entry newEntry = entry.started(); + readyDeletions.add(newEntry); + newDeletes.add(newEntry); + } else { + newDeletes.add(entry); + } + } + return Tuple.tuple( + changed + ? ClusterState.builder(currentState) + .putCustom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.of(newDeletes)) + .build() + : currentState, + readyDeletions + ); + } + + /** + * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update + * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an + * outdated shard generation. + *

+ * For example, shard snapshot X can be taken, but not finalized yet. Shard snapshot Y can then depend upon shard snapshot X. Then shard + * snapshot Y may finalize before shard snapshot X, but including X. However, X does not include Y. Therefore we update X to use Y's + * shard generation file (list of snapshots and dependencies) to avoid overwriting with X's file that is missing Y. + * + * @param state current cluster state + * @param snapshot snapshot for which to remove the snapshot operation + * @return updated cluster state + */ + public static ClusterState stateWithoutSnapshot( + ClusterState state, + Snapshot snapshot, + FinalizeSnapshotContext.UpdatedShardGenerations updatedShardGenerations + ) { + final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state); + ClusterState result = state; + int indexOfEntry = -1; + // Find the in-progress snapshot entry that matches {@code snapshot}. + final ProjectId projectId = snapshot.getProjectId(); + final String repository = snapshot.getRepository(); + final List entryList = inProgressSnapshots.forRepo(projectId, repository); + for (int i = 0; i < entryList.size(); i++) { + SnapshotsInProgress.Entry entry = entryList.get(i); + if (entry.snapshot().equals(snapshot)) { + indexOfEntry = i; + break; + } + } + if (indexOfEntry >= 0) { + final List updatedEntries = new ArrayList<>(entryList.size() - 1); + final SnapshotsInProgress.Entry removedEntry = entryList.get(indexOfEntry); + for (int i = 0; i < indexOfEntry; i++) { + final SnapshotsInProgress.Entry previousEntry = entryList.get(i); + if (removedEntry.isClone()) { + if (previousEntry.isClone()) { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) { + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + finishedShardEntry.getKey(), + previousEntry.shardSnapshotStatusByRepoShardId() + ); + } + } + addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); + } else { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); + if (shardState.state() != SnapshotsInProgress.ShardState.SUCCESS + || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false) { + continue; + } + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + previousEntry.shardId(repositoryShardId), + previousEntry.shards() + ); + + } + addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); + } + } else { + if (previousEntry.isClone()) { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); + if (shardState.state() != SnapshotsInProgress.ShardState.SUCCESS + || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false + || updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { + continue; + } + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + repositoryShardId, + previousEntry.shardSnapshotStatusByRepoShardId() + ); + } + addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); + } else { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS + && previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey()) + && updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) { + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + previousEntry.shardId(finishedShardEntry.getKey()), + previousEntry.shards() + ); + } + } + addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); + } + } + } + for (int i = indexOfEntry + 1; i < entryList.size(); i++) { + updatedEntries.add(entryList.get(i)); + } + result = ClusterState.builder(state) + .putCustom( + SnapshotsInProgress.TYPE, + inProgressSnapshots.createCopyWithUpdatedEntriesForRepo(projectId, repository, updatedEntries) + ) + .build(); + } + return readyDeletions(result, projectId).v1(); + } + + public static void addSnapshotEntry( + List entries, + SnapshotsInProgress.Entry entryToUpdate, + @Nullable ImmutableOpenMap.Builder updatedShardAssignments + ) { + if (updatedShardAssignments == null) { + entries.add(entryToUpdate); + } else { + final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder( + entryToUpdate.shards() + ); + updatedStatus.putAllFromMap(updatedShardAssignments.build()); + entries.add(entryToUpdate.withShardStates(updatedStatus.build())); + } + } + + public static void addCloneEntry( + List entries, + SnapshotsInProgress.Entry entryToUpdate, + @Nullable ImmutableOpenMap.Builder updatedShardAssignments + ) { + if (updatedShardAssignments == null) { + entries.add(entryToUpdate); + } else { + final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap + .builder(entryToUpdate.shardSnapshotStatusByRepoShardId()); + updatedStatus.putAllFromMap(updatedShardAssignments.build()); + entries.add(entryToUpdate.withClones(updatedStatus.build())); + } + } + + @Nullable + public static ImmutableOpenMap.Builder maybeAddUpdatedAssignment( + @Nullable ImmutableOpenMap.Builder updatedShardAssignments, + SnapshotsInProgress.ShardSnapshotStatus finishedShardState, + T shardId, + Map statesToUpdate + ) { + final ShardGeneration newGeneration = finishedShardState.generation(); + final SnapshotsInProgress.ShardSnapshotStatus stateToUpdate = statesToUpdate.get(shardId); + if (stateToUpdate != null + && stateToUpdate.state() == SnapshotsInProgress.ShardState.SUCCESS + && Objects.equals(newGeneration, stateToUpdate.generation()) == false) { + if (updatedShardAssignments == null) { + updatedShardAssignments = ImmutableOpenMap.builder(); + } + updatedShardAssignments.put(shardId, stateToUpdate.withUpdatedGeneration(newGeneration)); + } + return updatedShardAssignments; + } + + /** + * Remove the given {@link SnapshotId}s for the given {@code repository} from an instance of {@link SnapshotDeletionsInProgress}. + * If no deletion contained any of the snapshot ids to remove then return {@code null}. + * + * @param deletions snapshot deletions to update + * @param snapshotIds snapshot ids to remove + * @param projectId project for the repository + * @param repository repository that the snapshot ids belong to + * @return updated {@link SnapshotDeletionsInProgress} or {@code null} if unchanged + */ + @Nullable + public static SnapshotDeletionsInProgress deletionsWithoutSnapshots( + SnapshotDeletionsInProgress deletions, + Collection snapshotIds, + ProjectId projectId, + String repository + ) { + boolean changed = false; + List updatedEntries = new ArrayList<>(deletions.getEntries().size()); + for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { + final List updatedSnapshotIds = new ArrayList<>(entry.snapshots()); + if (updatedSnapshotIds.removeAll(snapshotIds)) { + changed = true; + updatedEntries.add(entry.withSnapshots(updatedSnapshotIds)); + } else { + updatedEntries.add(entry); + } + } else { + updatedEntries.add(entry); + } + } + return changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null; + } + + /** + * Determines the minimum {@link IndexVersion} that the snapshot repository must be compatible with + * from the current nodes in the cluster and the contents of the repository. + * The minimum version is determined as the lowest version found across all snapshots in the + * repository and all nodes in the cluster. + * + * @param minNodeVersion minimum node version in the cluster + * @param repositoryData current {@link RepositoryData} of that repository + * @param excluded snapshot id to ignore when computing the minimum version + * (used to use newer metadata version after a snapshot delete) + * @return minimum node version that must still be able to read the repository metadata + */ + public static IndexVersion minCompatibleVersion( + IndexVersion minNodeVersion, + RepositoryData repositoryData, + @Nullable Collection excluded + ) { + IndexVersion minCompatVersion = minNodeVersion; + final Collection snapshotIds = repositoryData.getSnapshotIds(); + for (SnapshotId snapshotId : snapshotIds.stream() + .filter(excluded == null ? Predicates.always() : Predicate.not(excluded::contains)) + .toList()) { + final IndexVersion known = repositoryData.getVersion(snapshotId); + // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs + if (known == null) { + assert repositoryData.shardGenerations().totalShards() == 0 + : "Saw shard generations [" + + repositoryData.shardGenerations() + + "] but did not have versions tracked for snapshot [" + + snapshotId + + "]"; + return OLD_SNAPSHOT_FORMAT; + } else { + minCompatVersion = IndexVersion.min(minCompatVersion, known); + } + } + return minCompatVersion; + } + + /** + * Shortcut to build new {@link ClusterState} from the current state and updated values of {@link SnapshotsInProgress} and + * {@link SnapshotDeletionsInProgress}. + * + * @param state current cluster state + * @param snapshotsInProgress new value for {@link SnapshotsInProgress} or {@code null} if it's unchanged + * @param snapshotDeletionsInProgress new value for {@link SnapshotDeletionsInProgress} or {@code null} if it's unchanged + * @return updated cluster state + */ + public static ClusterState updateWithSnapshots( + ClusterState state, + @Nullable SnapshotsInProgress snapshotsInProgress, + @Nullable SnapshotDeletionsInProgress snapshotDeletionsInProgress + ) { + if (snapshotsInProgress == null && snapshotDeletionsInProgress == null) { + return state; + } + ClusterState.Builder builder = ClusterState.builder(state); + if (snapshotsInProgress != null) { + builder.putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress); + } + if (snapshotDeletionsInProgress != null) { + builder.putCustom(SnapshotDeletionsInProgress.TYPE, snapshotDeletionsInProgress); + } + return builder.build(); + } + + public static void failListenersIgnoringException(@Nullable List> listeners, Exception failure) { + if (listeners != null) { + try { + ActionListener.onFailure(listeners, failure); + } catch (Exception ex) { + assert false : new AssertionError(ex); + logger.warn("Failed to notify listeners", ex); + } + } + } + + public static void completeListenersIgnoringException(@Nullable List> listeners, T result) { + if (listeners != null) { + try { + ActionListener.onResponse(listeners, result); + } catch (Exception ex) { + assert false : new AssertionError(ex); + logger.warn("Failed to notify listeners", ex); + } + } + } + + /** + * Calculates the assignment of shards to data nodes for a new snapshot based on the given cluster state and the + * indices that should be included in the snapshot. + * + * @param indices Indices to snapshot + * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot + * @return map of shard-id to snapshot-status of all shards included into current snapshot + */ + public static ImmutableOpenMap shards( + SnapshotsInProgress snapshotsInProgress, + SnapshotDeletionsInProgress deletionsInProgress, + ProjectState currentState, + Collection indices, + boolean useShardGenerations, + RepositoryData repositoryData, + String repoName + ) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); + final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forEntries( + snapshotsInProgress.forRepo(currentState.projectId(), repoName) + ); + final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(currentState.projectId(), repoName) == false; + for (IndexId index : indices) { + final String indexName = index.getName(); + final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; + IndexMetadata indexMetadata = currentState.metadata().index(indexName); + if (indexMetadata == null) { + // The index was deleted before we managed to start the snapshot - mark it as missing. + builder.put(new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0), SnapshotsInProgress.ShardSnapshotStatus.MISSING); + } else { + final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(indexName); + assert indexRoutingTable != null; + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + final ShardId shardId = indexRoutingTable.shard(i).shardId(); + final ShardGeneration shardRepoGeneration; + if (useShardGenerations) { + final ShardGeneration inFlightGeneration = inFlightShardStates.generationForShard( + index, + shardId.id(), + shardGenerations + ); + if (inFlightGeneration == null && isNewIndex) { + assert shardGenerations.getShardGen(index, shardId.getId()) == null + : "Found shard generation for new index [" + index + "]"; + shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; + } else { + shardRepoGeneration = inFlightGeneration; + } + } else { + shardRepoGeneration = null; + } + final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus; + if (readyToExecute == false || inFlightShardStates.isActive(shardId.getIndexName(), shardId.id())) { + shardSnapshotStatus = SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED; + } else { + shardSnapshotStatus = initShardSnapshotStatus( + shardRepoGeneration, + indexRoutingTable.shard(i).primaryShard(), + snapshotsInProgress::isNodeIdForRemoval + ); + } + builder.put(shardId, shardSnapshotStatus); + } + } + } + + return builder.build(); + } + + /** + * Compute the snapshot status for a given shard based on the current primary routing entry for the shard. + * + * @param shardRepoGeneration repository generation of the shard in the repository + * @param primary primary routing entry for the shard + * @param nodeIdRemovalPredicate tests whether a node ID is currently marked for removal from the cluster + * @return shard snapshot status + */ + public static SnapshotsInProgress.ShardSnapshotStatus initShardSnapshotStatus( + ShardGeneration shardRepoGeneration, + ShardRouting primary, + Predicate nodeIdRemovalPredicate + ) { + SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus; + if (primary == null || primary.assignedToNode() == false) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + null, + SnapshotsInProgress.ShardState.MISSING, + shardRepoGeneration, + "primary shard is not allocated" + ); + } else if (primary.relocating() || primary.initializing()) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + primary.currentNodeId(), + SnapshotsInProgress.ShardState.WAITING, + shardRepoGeneration + ); + } else if (nodeIdRemovalPredicate.test(primary.currentNodeId())) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + primary.currentNodeId(), + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + shardRepoGeneration + ); + } else if (primary.started() == false) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + primary.currentNodeId(), + SnapshotsInProgress.ShardState.MISSING, + shardRepoGeneration, + "primary shard hasn't been started yet" + ); + } else { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); + } + return shardSnapshotStatus; + } + + /** + * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the + * indices-to-check set. + */ + public static Set snapshottingDataStreams(final ProjectState projectState, final Set dataStreamsToCheck) { + Map dataStreams = projectState.metadata().dataStreams(); + return SnapshotsInProgress.get(projectState.cluster()) + .asStream(projectState.projectId()) + .filter(e -> e.partial() == false) + .flatMap(e -> e.dataStreams().stream()) + .filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds)) + .collect(Collectors.toSet()); + } + + /** + * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set. + */ + public static Set snapshottingIndices(final ProjectState projectState, final Set indicesToCheck) { + final Set indices = new HashSet<>(); + for (List snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster()) + .entriesByRepo(projectState.projectId())) { + for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) { + if (entry.partial() == false && entry.isClone() == false) { + for (String indexName : entry.indices().keySet()) { + IndexMetadata indexMetadata = projectState.metadata().index(indexName); + if (indexMetadata != null && indicesToCheck.contains(indexMetadata.getIndex())) { + indices.add(indexMetadata.getIndex()); + } + } + } + } + } + return indices; + } + + /** + * Filters out the aliases that refer to data streams to do not exist in the provided data streams. + * Also rewrites the list of data streams an alias point to to only contain data streams that exist in the provided data streams. + *

+ * The purpose of this method is to capture the relevant data stream aliases based on the data streams + * that will be included in a snapshot. + * + * @param dataStreams The provided data streams, which will be included in a snapshot. + * @param dataStreamAliases The data streams aliases that may contain aliases that refer to data streams + * that don't exist in the provided data streams. + * @return The filtered data streams aliases only referring to data streams in the provided data streams. + */ + public static Map filterDataStreamAliases( + Map dataStreams, + Map dataStreamAliases + ) { + return dataStreamAliases.values() + .stream() + .filter(alias -> alias.getDataStreams().stream().anyMatch(dataStreams::containsKey)) + .map(alias -> alias.intersect(dataStreams::containsKey)) + .collect(Collectors.toMap(DataStreamAlias::getName, Function.identity())); + } + + public static void logSnapshotFailure(String operation, Snapshot snapshot, Exception e) { + final var logLevel = snapshotFailureLogLevel(e); + if (logLevel == Level.INFO && logger.isDebugEnabled() == false) { + // suppress stack trace at INFO unless extra verbosity is configured + logger.info( + format( + "%s[%s] failed to %s snapshot: %s", + projectRepoString(snapshot.getProjectId(), snapshot.getRepository()), + snapshot.getSnapshotId().getName(), + operation, + e.getMessage() + ) + ); + } else { + logger.log( + logLevel, + () -> format("[%s][%s] failed to %s snapshot", snapshot.getRepository(), snapshot.getSnapshotId().getName(), operation), + e + ); + } + } + + public static Level snapshotFailureLogLevel(Exception e) { + if (MasterService.isPublishFailureException(e)) { + // no action needed, the new master will take things from here + return Level.INFO; + } else if (e instanceof InvalidSnapshotNameException) { + // no action needed, typically ILM-related, or a user error + return Level.INFO; + } else if (e instanceof IndexNotFoundException) { + // not worrying, most likely a user error + return Level.INFO; + } else if (e instanceof SnapshotException) { + if (e.getMessage().contains(ReferenceDocs.UNASSIGNED_SHARDS.toString())) { + // non-partial snapshot requested but cluster health is not yellow or green; the health is tracked elsewhere so no need to + // make more noise here + return Level.INFO; + } + } else if (e instanceof IllegalArgumentException) { + // some other user error + return Level.INFO; + } + return Level.WARN; + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2fe0071bf53b2..5f51bca466857 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1713,10 +1713,10 @@ public void onFailure(Exception e) { assertTrue("executed all runnable tasks but test steps are still incomplete", testListener.isDone()); safeAwait(testListener); // shouldn't throw }, - SnapshotsService.class, + SnapshotsServiceUtils.class, new MockLog.SeenEventExpectation( "INFO log", - SnapshotsService.class.getCanonicalName(), + SnapshotsServiceUtils.class.getCanonicalName(), Level.INFO, "*failed to create snapshot*the following indices have unassigned primary shards*" ) @@ -1784,16 +1784,16 @@ public void onFailure(Exception e) { assertTrue("executed all runnable tasks but test steps are still incomplete", testListener.isDone()); safeAwait(testListener); // shouldn't throw }, - SnapshotsService.class, + SnapshotsServiceUtils.class, new MockLog.SeenEventExpectation( "INFO log", - SnapshotsService.class.getCanonicalName(), + SnapshotsServiceUtils.class.getCanonicalName(), Level.INFO, Strings.format("*failed to create snapshot*%s", expectedMessage) ), new MockLog.SeenEventExpectation( "INFO log", - SnapshotsService.class.getCanonicalName(), + SnapshotsServiceUtils.class.getCanonicalName(), Level.INFO, Strings.format("*failed to clone snapshot*%s", expectedMessage) ) @@ -1843,10 +1843,10 @@ public void onFailure(Exception e) { assertTrue("executed all runnable tasks but test steps are still incomplete", testListener.isDone()); safeAwait(testListener); // shouldn't throw }, - SnapshotsService.class, + SnapshotsServiceUtils.class, new MockLog.SeenEventExpectation( "INFO log", - SnapshotsService.class.getCanonicalName(), + SnapshotsServiceUtils.class.getCanonicalName(), Level.INFO, Strings.format("failed to create snapshot: no such index [%s]", indexName) ) @@ -1895,10 +1895,10 @@ public void onFailure(Exception e) { assertTrue("executed all runnable tasks but test steps are still incomplete", testListener.isDone()); safeAwait(testListener); // shouldn't throw }, - SnapshotsService.class, + SnapshotsServiceUtils.class, new MockLog.SeenEventExpectation( "INFO log", - SnapshotsService.class.getCanonicalName(), + SnapshotsServiceUtils.class.getCanonicalName(), Level.INFO, Strings.format("*failed to create snapshot*other feature states were requested: [none, none]", "") ) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index a3f3647fdb45b..7181cda1b5e0b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -49,7 +49,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertTrue; public class SnapshotsServiceTests extends ESTestCase { @@ -640,7 +639,7 @@ public void testSnapshottingIndicesExcludesClones() { ); assertThat( - SnapshotsService.snapshottingIndices( + SnapshotsServiceUtils.snapshottingIndices( clusterState.projectState(), singleton(clusterState.metadata().getProject().index(indexName).getIndex()) ), diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index ead163eaf26a4..6f689ac7ed64b 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -167,7 +167,7 @@ protected void disableRepoConsistencyCheck(String reason) { protected RepositoryData getRepositoryData(String repoName, IndexVersion version) { final RepositoryData repositoryData = getRepositoryData(repoName); - if (SnapshotsService.includesUUIDs(version) == false) { + if (SnapshotsServiceUtils.includesUUIDs(version) == false) { return repositoryData.withoutUUIDs(); } else { return repositoryData;