diff --git a/docs/changelog/127250.yaml b/docs/changelog/127250.yaml new file mode 100644 index 0000000000000..44a41645bd6c3 --- /dev/null +++ b/docs/changelog/127250.yaml @@ -0,0 +1,6 @@ +pr: 127250 +summary: Do not apply further shard snapshot status updates after shard snapshot is + complete +area: Snapshot/Restore +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 4b24e4706269a..a801a7067ccfe 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -3398,16 +3398,31 @@ private void startShardOperation( startedCount++; } + /** + * Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering. + * + * @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus + * @param newShardSnapshotStatusesSupplier Supplies a builder. Initially maps shard ID to existing ShardSnapshotStatus. As shard + * snapshot status updates are applied, the original status entries are replaced with + * updated ones. + * @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus + * @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons) + */ private void executeShardSnapshotUpdate( - Map existingStates, - Supplier> newStates, - ShardSnapshotUpdate updateSnapshotState, - T updatedShard + Map existingShardSnapshotStatuses, + Supplier> newShardSnapshotStatusesSupplier, + ShardSnapshotUpdate shardSnapshotStatusUpdate, + T shardSnapshotId ) { - assert updateSnapshotState.snapshot.equals(entry.snapshot()); - final ShardSnapshotStatus existing = existingStates.get(updatedShard); + assert shardSnapshotStatusUpdate.snapshot.equals(entry.snapshot()); + + final ShardSnapshotStatus existing = existingShardSnapshotStatuses.get(shardSnapshotId); if (existing == null) { - logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", updatedShard, entry); + logger.error( + "Received shard snapshot status update [{}] but this shard is not tracked in [{}]", + shardSnapshotStatusUpdate, + entry + ); assert false : "This should never happen, should only receive updates for expected shards"; return; } @@ -3418,34 +3433,49 @@ private void executeShardSnapshotUpdate( return; } - final ShardSnapshotStatus updatedState; + final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get(); + final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId); + assert newShardSnapshotStatus != null; // builder is seeded with all the original statuses. + if (newShardSnapshotStatus.state().completed()) { + // An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished. + // For example, a delayed/retried PAUSED update should not override a completed shard snapshot. + iterator.remove(); + return; + } + + final ShardSnapshotStatus updatedShardSnapshotStatus; if (existing.state() == ShardState.ABORTED - && updateSnapshotState.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { + && shardSnapshotStatusUpdate.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { // concurrently pausing the shard snapshot due to node shutdown and aborting the snapshot - this shard is no longer // actively snapshotting but we don't want it to resume, so mark it as FAILED since it didn't complete - updatedState = new ShardSnapshotStatus( - updateSnapshotState.updatedState.nodeId(), + updatedShardSnapshotStatus = new ShardSnapshotStatus( + shardSnapshotStatusUpdate.updatedState.nodeId(), ShardState.FAILED, - updateSnapshotState.updatedState.generation(), + shardSnapshotStatusUpdate.updatedState.generation(), "snapshot aborted" ); } else { - updatedState = updateSnapshotState.updatedState; + updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState; } - if (updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { + if (updatedShardSnapshotStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { // leave subsequent entries for this shard alone until this one is unpaused iterator.remove(); } else { // All other shard updates leave the shard in a complete state, which means we should leave this update in the list so - // it can fall through to later entries and start any waiting shard snapshots: - assert updatedState.isActive() == false : updatedState; + // that it can fall through to later entries and start any waiting shard snapshots: + assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus; } - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot, updatedShard, updatedState.state()); + logger.trace( + "[{}] Updating shard [{}] with status [{}]", + shardSnapshotStatusUpdate.snapshot, + shardSnapshotId, + updatedShardSnapshotStatus.state() + ); changedCount++; - newStates.get().put(updatedShard, updatedState); - executedUpdates.add(updateSnapshotState); + newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus); + executedUpdates.add(shardSnapshotStatusUpdate); } private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index b95695bc68df2..e51df3521ee9b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -49,6 +49,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertTrue; public class SnapshotsServiceTests extends ESTestCase { @@ -467,6 +468,164 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception { ); } + private SnapshotsInProgress.ShardSnapshotStatus successShardSnapshotStatus( + String nodeId, + ShardId shardId, + SnapshotsInProgress.Entry entry + ) { + return SnapshotsInProgress.ShardSnapshotStatus.success( + nodeId, + new ShardSnapshotResult(entry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1) + ); + } + + private SnapshotsInProgress.ShardSnapshotStatus failedShardSnapshotStatus( + String nodeId, + ShardId shardId, + SnapshotsInProgress.Entry entry + ) { + return new SnapshotsInProgress.ShardSnapshotStatus( + nodeId, + SnapshotsInProgress.ShardState.FAILED, + entry.shards().get(shardId).generation(), + "test injected failure" + ); + } + + private SnapshotsInProgress.ShardSnapshotStatus pausedShardSnapshotStatus( + String nodeId, + ShardId shardId, + SnapshotsInProgress.Entry entry + ) { + return new SnapshotsInProgress.ShardSnapshotStatus( + nodeId, + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + entry.shards().get(shardId).generation() + ); + } + + /** + * Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be + * ignored after the same shard snapshot has already been updated to a completed state. On the other hand, a PAUSED_FOR_NODE_REMOVAL + * update follow by a SUCCESS, or other completed state, update should be applied and result in SUCCESS. + */ + public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() throws Exception { + final var repoName = "test-repo-name"; + final var snapshot1 = snapshot(repoName, "test-snap-1"); + final var snapshot2 = snapshot(repoName, "test-snap-2"); + final var indexName = "test-index-name"; + final var shardId = new ShardId(index(indexName), 0); + final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0); + final var originalNodeId = uuid(); + final var otherNodeId = uuid(); + + final SnapshotsInProgress.Entry runningSnapshotEntry = snapshotEntry( + snapshot1, + Collections.singletonMap(indexName, repositoryShardId.index()), + Map.of(shardId, initShardStatus(originalNodeId)) + ); + + final SnapshotsInProgress.Entry queuedSnapshotEntry = snapshotEntry( + snapshot2, + Collections.singletonMap(indexName, repositoryShardId.index()), + Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED) + ); + + final ClusterState initialState = stateWithSnapshots( + ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes( + DiscoveryNodes.builder() + .add(DiscoveryNodeUtils.create(originalNodeId)) + .localNodeId(originalNodeId) + .masterNodeId(originalNodeId) + .build() + ) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addShard(TestShardRouting.newShardRouting(shardId, originalNodeId, true, ShardRoutingState.STARTED)) + ) + .build() + ) + .build(), + repoName, + runningSnapshotEntry, + queuedSnapshotEntry + ); + + assertEquals( + SnapshotsInProgress.ShardState.QUEUED, + SnapshotsInProgress.get(initialState).snapshot(snapshot2).shards().get(shardId).state() + ); + + /** + * In this scenario, {@link originalNodeId} is the original shard owner that resends PAUSED, and {@link otherNodeId} is the new + * shard owner that completes the shard snapshot. The production code doesn't verify node ownership, but it's helpful for the test. + */ + + // Ultimately ignored statuses. + var pausedOnOriginalNodeStatus = pausedShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry); + var successfulOnOriginalNodeStatus = successShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry); + + // Ultimately applied statuses. + var successfulOnOtherNodeStatus = successShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry); + var failedOnOtherNodeStatus = failedShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry); + + var completedUpdateOnOtherNode = new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + // Success and failure are both completed shard snapshot states, so paused should be ignored when either is set. + randomBoolean() ? successfulOnOtherNodeStatus : failedOnOtherNodeStatus, + ActionTestUtils.assertNoFailureListener(t -> {}) + ); + var pausedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + pausedOnOriginalNodeStatus, + ActionTestUtils.assertNoFailureListener(t -> {}) + ); + var completedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + successfulOnOriginalNodeStatus, + ActionTestUtils.assertNoFailureListener(t -> {}) + ); + + boolean random = randomBoolean(); + ClusterState updatedState; + if (randomBoolean()) { + updatedState = applyUpdates( + initialState, + // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes + // after the completed update, paused should be ignored and the shard snapshot remains in a completed state. + random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, + random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode + ); + } else { + updatedState = applyUpdates( + initialState, + random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, + random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode, + // Randomly add another update that will be ignored because the shard snapshot is complete. + // Note: the originalNodeId is used for this update, so we can verify afterward that the update is not applied. + randomBoolean() ? completedUpdateOnOriginalNode : pausedUpdateOnOriginalNode + ); + } + + assertTrue(SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state().completed()); + assertEquals(otherNodeId, SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).nodeId()); + + // Since the first snapshot completed, the second snapshot should be set to proceed with snapshotting the same shard. + assertEquals( + SnapshotsInProgress.ShardState.INIT, + SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state() + ); + } + public void testSnapshottingIndicesExcludesClones() { final String repoName = "test-repo"; final String indexName = "index"; @@ -570,10 +729,21 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa assertSame(applyUpdates(state, shardCompletion), state); } + /** + * Runs the shard snapshot updates through a ClusterStateTaskExecutor that executes the + * {@link SnapshotsService.SnapshotShardsUpdateContext}. + * + * @param state Original cluster state + * @param updates List of SnapshotTask tasks to apply to the cluster state + * @return An updated cluster state, or, if no change were made, the original given cluster state. + */ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception { return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> { final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState()); - final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {}); + final var context = new SnapshotsService.SnapshotShardsUpdateContext( + batchExecutionContext, + /* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {} + ); final SnapshotsInProgress updated = context.computeUpdatedState(); context.completeWithUpdatedState(updated); if (existing == updated) { @@ -617,6 +787,9 @@ private static SnapshotsInProgress.Entry cloneEntry( .withClones(clones); } + /** + * Helper method to create a shard snapshot status with state {@link SnapshotsInProgress.ShardState#INIT}. + */ private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) { return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardGeneration.newGeneration(random())); }