diff --git a/docs/changelog/128650.yaml b/docs/changelog/128650.yaml new file mode 100644 index 0000000000000..a587f4f2cdf71 --- /dev/null +++ b/docs/changelog/128650.yaml @@ -0,0 +1,6 @@ +pr: 128650 +summary: Update shardGenerations for all indices on snapshot finalization +area: Snapshot/Restore +type: enhancement +issues: + - 108907 diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index c7b051136eab8..b508238406373 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -27,7 +27,7 @@ */ public final class FinalizeSnapshotContext extends DelegatingActionListener { - private final ShardGenerations updatedShardGenerations; + private final UpdatedShardGenerations updatedShardGenerations; /** * Obsolete shard generations map computed from the cluster state update that this finalization executed in @@ -46,7 +46,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener> indexMetaDataToRemoveAfterRemovingSnapsh * * @param snapshotId Id of the new snapshot * @param details Details of the new snapshot - * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new - * generations indexed by the shard id they correspond to must be supplied. + * @param updatedShardGenerations Updated shard generations in the new snapshot, including both indices that are included + * in the given snapshot and those got deleted while finalizing. * @param indexMetaBlobs Map of index metadata blob uuids * @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the * {@link IndexMetadata} in them @@ -414,7 +415,7 @@ public Map> indexMetaDataToRemoveAfterRemovingSnapsh public RepositoryData addSnapshot( final SnapshotId snapshotId, final SnapshotDetails details, - final ShardGenerations shardGenerations, + final UpdatedShardGenerations updatedShardGenerations, @Nullable final Map indexMetaBlobs, @Nullable final Map newIdentifiers ) { @@ -424,12 +425,13 @@ public RepositoryData addSnapshot( // the new master, so we make the operation idempotent return this; } + final var liveIndexIds = updatedShardGenerations.liveIndices().indices(); Map snapshots = new HashMap<>(snapshotIds); snapshots.put(snapshotId.getUUID(), snapshotId); Map newSnapshotDetails = new HashMap<>(snapshotsDetails); newSnapshotDetails.put(snapshotId.getUUID(), details); Map> allIndexSnapshots = new HashMap<>(indexSnapshots); - for (final IndexId indexId : shardGenerations.indices()) { + for (final IndexId indexId : liveIndexIds) { final List snapshotIds = allIndexSnapshots.get(indexId); if (snapshotIds == null) { allIndexSnapshots.put(indexId, List.of(snapshotId)); @@ -445,11 +447,8 @@ public RepositoryData addSnapshot( : "Index meta generations should have been empty but was [" + indexMetaDataGenerations + "]"; newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY; } else { - assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet()) - : "Shard generations contained indices " - + shardGenerations.indices() - + " but indexMetaData was given for " - + indexMetaBlobs.keySet(); + assert indexMetaBlobs.isEmpty() || liveIndexIds.equals(indexMetaBlobs.keySet()) + : "Shard generations contained indices " + liveIndexIds + " but indexMetaData was given for " + indexMetaBlobs.keySet(); newIndexMetaGenerations = indexMetaDataGenerations.withAddedSnapshot(snapshotId, indexMetaBlobs, newIdentifiers); } @@ -459,7 +458,7 @@ public RepositoryData addSnapshot( snapshots, newSnapshotDetails, allIndexSnapshots, - ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(), + ShardGenerations.builder().putAll(this.shardGenerations).update(updatedShardGenerations).build(), newIndexMetaGenerations, clusterUUID ); diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java index 7ba8019bb14bb..75719f04c1c2b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java @@ -28,6 +28,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; + /** * Represents the current {@link ShardGeneration} for each shard in a repository. */ @@ -231,6 +233,14 @@ public Builder putAll(ShardGenerations shardGenerations) { return this; } + public Builder update(UpdatedShardGenerations updatedShardGenerations) { + putAll(updatedShardGenerations.liveIndices()); + // For deleted indices, we only update the generations if they are present in the existing generations, i.e. + // they are referenced by other snapshots. + updateIfPresent(updatedShardGenerations.deletedIndices()); + return this; + } + public Builder put(IndexId indexId, int shardId, SnapshotsInProgress.ShardSnapshotStatus status) { // only track generations for successful shard status values return put(indexId, shardId, status.state().failed() ? null : status.generation()); @@ -244,6 +254,20 @@ public Builder put(IndexId indexId, int shardId, ShardGeneration generation) { return this; } + private void updateIfPresent(ShardGenerations shardGenerations) { + shardGenerations.shardGenerations.forEach((indexId, gens) -> { + final Map existingShardGens = generations.get(indexId); + if (existingShardGens != null) { + for (int i = 0; i < gens.size(); i++) { + final ShardGeneration gen = gens.get(i); + if (gen != null) { + existingShardGens.put(i, gen); + } + } + } + }); + } + private boolean noDuplicateIndicesWithSameName(IndexId newId) { for (IndexId id : generations.keySet()) { if (id.getName().equals(newId.getName()) && id.equals(newId) == false) { @@ -254,6 +278,9 @@ private boolean noDuplicateIndicesWithSameName(IndexId newId) { } public ShardGenerations build() { + if (generations.isEmpty()) { + return EMPTY; + } return new ShardGenerations(generations.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { final Set shardIds = entry.getValue().keySet(); assert shardIds.isEmpty() == false; diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d69b9b739f7ce..e5b9e375db13c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1749,11 +1749,10 @@ int sizeInBytes() { public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final long repositoryStateId = finalizeSnapshotContext.repositoryStateId(); - final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations(); final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; - final Collection indices = shardGenerations.indices(); + final Collection indices = finalizeSnapshotContext.updatedShardGenerations().liveIndices().indices(); final SnapshotId snapshotId = snapshotInfo.snapshotId(); // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION @@ -1867,7 +1866,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new existingRepositoryData.addSnapshot( snapshotId, snapshotDetails, - shardGenerations, + finalizeSnapshotContext.updatedShardGenerations(), metadataWriteResult.indexMetas(), metadataWriteResult.indexMetaIdentifiers() ), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7b81589b92f3d..8496faf7bc0e6 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -89,6 +89,7 @@ import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.repositories.FinalizeSnapshotContext; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -469,7 +470,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone endingSnapshots.add(targetSnapshot); initializingClones.remove(targetSnapshot); logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e); - removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY); + removeFailedSnapshotFromClusterState(targetSnapshot, e, null, UpdatedShardGenerations.EMPTY); }; // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone @@ -748,21 +749,28 @@ private static void validate(final String repositoryName, final String snapshotN } } - private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + private static UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { ShardGenerations.Builder builder = ShardGenerations.builder(); + ShardGenerations.Builder deletedBuilder = null; if (snapshot.isClone()) { snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value)); } else { - snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { + for (Map.Entry entry : snapshot.shardSnapshotStatusByRepoShardId().entrySet()) { + RepositoryShardId key = entry.getKey(); + ShardSnapshotStatus value = entry.getValue(); final Index index = snapshot.indexByName(key.indexName()); if (metadata.findIndex(index).isEmpty()) { assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; - return; + if (deletedBuilder == null) { + deletedBuilder = ShardGenerations.builder(); + } + deletedBuilder.put(key.index(), key.shardId(), value); + continue; } builder.put(key.index(), key.shardId(), value); - }); + } } - return builder.build(); + return new UpdatedShardGenerations(builder.build(), deletedBuilder == null ? ShardGenerations.EMPTY : deletedBuilder.build()); } private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) { @@ -1360,7 +1368,7 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu snapshot, new SnapshotException(snapshot, entry.failure()), null, - ShardGenerations.EMPTY + UpdatedShardGenerations.EMPTY ); } return; @@ -1454,8 +1462,9 @@ protected void doRun() { SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot); final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); - final ShardGenerations shardGenerations = buildGenerations(entry, metadata); - final List finalIndices = shardGenerations.indices().stream().map(IndexId::getName).toList(); + final var updatedShardGenerations = buildGenerations(entry, metadata); + final ShardGenerations updatedShardGensForLiveIndices = updatedShardGenerations.liveIndices(); + final List finalIndices = updatedShardGensForLiveIndices.indices().stream().map(IndexId::getName).toList(); final Set indexNames = new HashSet<>(finalIndices); ArrayList shardFailures = new ArrayList<>(); for (Map.Entry shardStatus : entry.shardSnapshotStatusByRepoShardId().entrySet()) { @@ -1552,7 +1561,7 @@ protected void doRun() { entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(), failure, threadPool.absoluteTimeInMillis(), - entry.partial() ? shardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(), + entry.partial() ? updatedShardGensForLiveIndices.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(), shardFailures, entry.includeGlobalState(), entry.userMetadata(), @@ -1562,7 +1571,7 @@ protected void doRun() { final ListenableFuture>> snapshotListeners = new ListenableFuture<>(); repo.finalizeSnapshot( new FinalizeSnapshotContext( - shardGenerations, + updatedShardGenerations, repositoryData.getGenId(), metaForSnapshot, snapshotInfo, @@ -1579,7 +1588,7 @@ protected void doRun() { snapshot, repositoryData, // we might have written the new root blob before failing here, so we must use the updated shardGenerations - shardGenerations + updatedShardGenerations ) ), () -> snapshotListeners.addListener(new ActionListener<>() { @@ -1604,7 +1613,7 @@ public void onFailure(Exception e) { repositoryData, // a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can // use the updated shardGenerations for all pending shard snapshots - shardGenerations + updatedShardGenerations ) )); } @@ -1613,7 +1622,7 @@ public void onFailure(Exception e) { public void onRejection(Exception e) { if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) { logger.debug("failing finalization of {} due to shutdown", snapshot); - handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY); + handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY); } else { onFailure(e); } @@ -1623,7 +1632,7 @@ public void onRejection(Exception e) { public void onFailure(Exception e) { logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e); assert false : new AssertionError("unexpected failure finalizing " + snapshot, e); - handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY); + handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY); } } @@ -1679,7 +1688,7 @@ private void handleFinalizationFailure( Exception e, Snapshot snapshot, RepositoryData repositoryData, - ShardGenerations shardGenerations + UpdatedShardGenerations updatedShardGenerations ) { if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) { // Failure due to not being master any more, don't try to remove snapshot from cluster state the next master @@ -1693,7 +1702,7 @@ private void handleFinalizationFailure( failAllListenersOnMasterFailOver(e); } else { logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e); - removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations); + removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, updatedShardGenerations); } } @@ -1817,7 +1826,11 @@ private static Tuple> read * @param snapshot snapshot for which to remove the snapshot operation * @return updated cluster state */ - public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) { + public static ClusterState stateWithoutSnapshot( + ClusterState state, + Snapshot snapshot, + UpdatedShardGenerations updatedShardGenerations + ) { final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state); ClusterState result = state; int indexOfEntry = -1; @@ -1883,7 +1896,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); if (shardState.state() != ShardState.SUCCESS || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false - || shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { + || updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { continue; } updatedShardAssignments = maybeAddUpdatedAssignment( @@ -1902,7 +1915,7 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); if (shardState.state() == ShardState.SUCCESS && previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey()) - && shardGenerations.hasShardGen(finishedShardEntry.getKey())) { + && updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) { updatedShardAssignments = maybeAddUpdatedAssignment( updatedShardAssignments, shardState, @@ -1992,14 +2005,14 @@ private void removeFailedSnapshotFromClusterState( Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData, - ShardGenerations shardGenerations + UpdatedShardGenerations updatedShardGenerations ) { assert failure != null : "Failure must be supplied"; submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations); + final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, updatedShardGenerations); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 250d10855b23f..cb94564befa66 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESTestCase; @@ -118,7 +119,7 @@ public void testAddSnapshots() { randomNonNegativeLong(), randomAlphaOfLength(10) ), - shardGenerations, + new UpdatedShardGenerations(shardGenerations, ShardGenerations.EMPTY), indexLookup, indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random()))) ); @@ -218,7 +219,7 @@ public void testGetSnapshotState() { randomNonNegativeLong(), randomAlphaOfLength(10) ), - ShardGenerations.EMPTY, + UpdatedShardGenerations.EMPTY, Collections.emptyMap(), Collections.emptyMap() ); @@ -400,7 +401,13 @@ public void testIndexMetaDataToRemoveAfterRemovingSnapshotWithSharing() { randomNonNegativeLong(), randomAlphaOfLength(10) ); - final RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, details, shardGenerations, indexLookup, newIdentifiers); + final RepositoryData newRepoData = repositoryData.addSnapshot( + newSnapshot, + details, + new UpdatedShardGenerations(shardGenerations, ShardGenerations.EMPTY), + indexLookup, + newIdentifiers + ); assertEquals( newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(newSnapshot)), newIndices.entrySet() @@ -476,7 +483,7 @@ public static RepositoryData generateRandomRepoData() { randomNonNegativeLong(), randomAlphaOfLength(10) ), - builder.build(), + new UpdatedShardGenerations(builder.build(), ShardGenerations.EMPTY), indexLookup, indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random()))) ); diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 3666799ce0964..044b054ad873a 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.FinalizeSnapshotContext; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -170,16 +171,19 @@ public void testSnapshotWithConflictingName() throws Exception { repository.getMetadata().name(), new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2") ); - final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build(); + final var snapshotShardGenerations = new UpdatedShardGenerations( + ShardGenerations.builder().put(indexId, 0, shardGen).build(), + ShardGenerations.EMPTY + ); final RepositoryData ignoredRepositoryData = safeAwait( listener -> repository.finalizeSnapshot( new FinalizeSnapshotContext( - shardGenerations, + snapshotShardGenerations, RepositoryData.EMPTY_REPO_GEN, Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), new SnapshotInfo( snapshot, - shardGenerations.indices().stream().map(IndexId::getName).toList(), + snapshotShardGenerations.liveIndices().indices().stream().map(IndexId::getName).toList(), Collections.emptyList(), Collections.emptyList(), null, diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 845dca770d7fb..076be2cd27e84 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; @@ -510,7 +511,7 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo repoData = repoData.addSnapshot( snapshotId, details, - shardGenerations, + new UpdatedShardGenerations(shardGenerations, ShardGenerations.EMPTY), indexLookup, indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random()))) ); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2daf1222748b4..b2ddf6370a0fa 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; @@ -197,6 +198,7 @@ import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.usage.UsageService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -1414,6 +1416,231 @@ public TransportRequestHandler interceptHandler( safeAwait(testListener); // shouldn't throw } + /** + * A device for allowing a test precise control over the order in which shard-snapshot updates are processed by the master. The test + * must install the result of {@link #newTransportInterceptor} on the master node and may then call {@link #releaseBlock} as needed to + * release blocks on the processing of individual shard snapshot updates. + */ + private static class ShardSnapshotUpdatesSequencer { + + private final Map>> shardSnapshotUpdatesBlockMap = new HashMap<>(); + + private static final SubscribableListener ALWAYS_PROCEED = SubscribableListener.newSucceeded(null); + + private SubscribableListener listenerFor(String snapshot, String index) { + if ("last-snapshot".equals(snapshot) || "first-snapshot".equals(snapshot)) { + return ALWAYS_PROCEED; + } + + return shardSnapshotUpdatesBlockMap + // + .computeIfAbsent(snapshot, v -> new HashMap<>()) + .computeIfAbsent(index, v -> new SubscribableListener<>()); + } + + void releaseBlock(String snapshot, String index) { + listenerFor(snapshot, index).onResponse(null); + } + + /** + * @return a {@link TransportInterceptor} which enforces the sequencing of shard snapshot updates + */ + TransportInterceptor newTransportInterceptor() { + return new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + Executor executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + if (action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) { + return (request, channel, task) -> ActionListener.run( + ActionTestUtils.assertNoFailureListener(new ChannelActionListener<>(channel)::onResponse), + l -> { + final var updateRequest = asInstanceOf(UpdateIndexShardSnapshotStatusRequest.class, request); + listenerFor(updateRequest.snapshot().getSnapshotId().getName(), updateRequest.shardId().getIndexName()).< + TransportResponse>andThen( + ll -> actualHandler.messageReceived(request, new TestTransportChannel(ll), task) + ).addListener(l); + } + ); + } else { + return actualHandler; + } + } + }; + } + } + + public void testDeleteIndexBetweenSuccessAndFinalization() { + + final var sequencer = new ShardSnapshotUpdatesSequencer(); + + setupTestCluster( + 1, + 1, + node -> node.isMasterNode() ? sequencer.newTransportInterceptor() : TransportService.NOOP_TRANSPORT_INTERCEPTOR + ); + + final var masterNode = testClusterNodes.randomMasterNodeSafe(); + final var client = masterNode.client; + final var masterClusterService = masterNode.clusterService; + + final var snapshotCount = between(3, 5); + final var indices = IntStream.range(0, snapshotCount + 1).mapToObj(i -> "index-" + i).toList(); + final var repoName = "repo"; + final var indexToDelete = "index-" + snapshotCount; + + var testListener = SubscribableListener + + // Create the repo and indices + .newForked(stepListener -> { + try (var listeners = new RefCountingListener(stepListener)) { + client().admin() + .cluster() + .preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName) + .setType(FsRepository.TYPE) + .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) + .execute(listeners.acquire(createRepoResponse -> {})); + + for (final var index : indices) { + client.admin() + .indices() + .create( + new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(1)), + listeners.acquire(createIndexResponse -> {}) + ); + } + } + }) + .andThen(l -> { + // Create the first snapshot as source of the clone + client.admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, "first-snapshot") + .setIndices("index-0", indexToDelete) + .setPartial(false) + .setWaitForCompletion(true) + .execute(l.map(v -> null)); + }); + + // Start some snapshots such that snapshot-{i} contains index-{i} and index-{snapshotCount} so that we can control the order in + // which they finalize by controlling the order in which the shard snapshot updates are processed + final var cloneFuture = new PlainActionFuture(); + for (int i = 0; i < snapshotCount; i++) { + final var snapshotName = "snapshot-" + i; + final var indexName = "index-" + i; + testListener = testListener.andThen( + stepListener -> client.admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, snapshotName) + .setIndices(indexName, indexToDelete) + .setPartial(true) + .execute(stepListener.map(createSnapshotResponse -> null)) + ); + if (i == 0) { + // Insert a clone between snapshot-0 and snapshot-1 and it finalizes after snapshot-1 because it will be blocked on index-0 + testListener = testListener.andThen(stepListener -> { + client.admin() + .cluster() + .prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, repoName, "first-snapshot", "clone") + .setIndices("index-0", indexToDelete) + .execute(cloneFuture); + ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + clusterState -> SnapshotsInProgress.get(clusterState) + .asStream() + .anyMatch(e -> e.snapshot().getSnapshotId().getName().equals("clone") && e.isClone()) + ).addListener(stepListener.map(v -> null)); + }); + } + } + + testListener = testListener + // wait for the target index to complete in snapshot-1 + .andThen(l -> { + sequencer.releaseBlock("snapshot-0", indexToDelete); + sequencer.releaseBlock("snapshot-1", indexToDelete); + + ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + clusterState -> SnapshotsInProgress.get(clusterState) + .asStream() + .filter(e -> e.isClone() == false) + .mapToLong( + e -> e.shards() + .entrySet() + .stream() + .filter( + e2 -> e2.getKey().getIndexName().equals(indexToDelete) + && e2.getValue().state() == SnapshotsInProgress.ShardState.SUCCESS + ) + .count() + ) + .sum() == 2 + ).addListener(l.map(v -> null)); + }) + + // delete the target index + .andThen(l -> client.admin().indices().delete(new DeleteIndexRequest(indexToDelete), l.map(acknowledgedResponse -> null))) + + // wait for snapshot-1 to complete + .andThen(l -> { + sequencer.releaseBlock("snapshot-1", "index-1"); + ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + cs -> SnapshotsInProgress.get(cs).asStream().noneMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-1")) + ).addListener(l.map(v -> null)); + }) + + // wait for all the other snapshots to complete + .andThen(l -> { + // Clone is yet to be finalized + assertTrue(SnapshotsInProgress.get(masterClusterService.state()).asStream().anyMatch(SnapshotsInProgress.Entry::isClone)); + for (int i = 0; i < snapshotCount; i++) { + sequencer.releaseBlock("snapshot-" + i, indexToDelete); + sequencer.releaseBlock("snapshot-" + i, "index-" + i); + } + ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty()) + .addListener(l.map(v -> null)); + }) + .andThen(l -> { + final var snapshotNames = Stream.concat( + Stream.of("clone"), + IntStream.range(0, snapshotCount).mapToObj(i -> "snapshot-" + i) + ).toArray(String[]::new); + + client.admin() + .cluster() + .prepareGetSnapshots(TEST_REQUEST_TIMEOUT, repoName) + .setSnapshots(snapshotNames) + .execute(ActionTestUtils.assertNoFailureListener(getSnapshotsResponse -> { + for (final var snapshot : getSnapshotsResponse.getSnapshots()) { + assertThat(snapshot.state(), is(SnapshotState.SUCCESS)); + final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); + if ("clone".equals(snapshotName)) { + // Clone is not affected by index deletion + assertThat(snapshot.indices(), containsInAnyOrder("index-0", indexToDelete)); + } else { + // Does not contain the deleted index in the snapshot + assertThat(snapshot.indices(), contains("index-" + snapshotName.charAt(snapshotName.length() - 1))); + } + } + l.onResponse(null); + })); + }); + + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue( + "executed all runnable tasks but test steps are still incomplete: " + + Strings.toString(SnapshotsInProgress.get(masterClusterService.state()), true, true), + testListener.isDone() + ); + safeAwait(testListener); // shouldn't throw + assertTrue(cloneFuture.isDone()); + } + @TestLogging(reason = "testing logging at INFO level", value = "org.elasticsearch.snapshots.SnapshotsService:INFO") public void testFullSnapshotUnassignedShards() { setupTestCluster(1, 0); // no data nodes, we want unassigned shards @@ -2554,6 +2781,10 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { TransportCloneSnapshotAction.TYPE, new TransportCloneSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters) ); + actions.put( + TransportGetSnapshotsAction.TYPE, + new TransportGetSnapshotsAction(transportService, clusterService, threadPool, repositoriesService, actionFilters) + ); actions.put( TransportClusterRerouteAction.TYPE, new TransportClusterRerouteAction( diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index dc500220850e3..63e77cfdc4523 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -38,11 +38,11 @@ import org.elasticsearch.index.IndexVersions; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.FinalizeSnapshotContext; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ResolvedRepositories; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; @@ -545,7 +545,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map listener) -> repo.finalizeSnapshot( new FinalizeSnapshotContext( - ShardGenerations.EMPTY, + UpdatedShardGenerations.EMPTY, getRepositoryData(repoName).getGenId(), state.metadata(), snapshotInfo, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java index b969e9d2dc4b5..e6f3c3fa54277 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java @@ -102,7 +102,10 @@ public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { new FinalizeSnapshotContext( finalizeSnapshotContext.updatedShardGenerations(), finalizeSnapshotContext.repositoryStateId(), - metadataToSnapshot(finalizeSnapshotContext.updatedShardGenerations().indices(), finalizeSnapshotContext.clusterMetadata()), + metadataToSnapshot( + finalizeSnapshotContext.updatedShardGenerations().liveIndices().indices(), + finalizeSnapshotContext.clusterMetadata() + ), finalizeSnapshotContext.snapshotInfo(), finalizeSnapshotContext.repositoryMetaVersion(), finalizeSnapshotContext, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 7f57e9b494aa6..207ac4f38aed6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -67,6 +67,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.FinalizeSnapshotContext; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGeneration; @@ -372,7 +373,7 @@ public void testRestoreMinimal() throws IOException { .build(); repository.finalizeSnapshot( new FinalizeSnapshotContext( - shardGenerations, + new UpdatedShardGenerations(shardGenerations, ShardGenerations.EMPTY), ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), new SnapshotInfo( diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/TransportSLMGetExpiredSnapshotsActionTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/TransportSLMGetExpiredSnapshotsActionTests.java index e6d7a66a2bdb3..6b420503fd4c0 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/TransportSLMGetExpiredSnapshotsActionTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/TransportSLMGetExpiredSnapshotsActionTests.java @@ -21,11 +21,11 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryMissingException; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -299,7 +299,7 @@ private static Repository createMockRepository(ThreadPool threadPool, List