From aba17166032e9400a900d5393c158a81f91944f4 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 09:26:24 -0400 Subject: [PATCH 01/10] Do not apply further shard snapshot status updates after shard snapshot is complete --- .../snapshots/SnapshotsService.java | 65 +++++++--- .../snapshots/SnapshotsServiceTests.java | 118 +++++++++++++++++- 2 files changed, 163 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 4b24e4706269a..3b391e545af66 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -3398,16 +3398,29 @@ private void startShardOperation( startedCount++; } + /** + * Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering. + * + * @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus + * @param newShardSnapshotStatusesSupplier Supplies a builder mapping shard ID to updated ShardSnapshotStatus + * @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus + * @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons) + */ private void executeShardSnapshotUpdate( - Map existingStates, - Supplier> newStates, - ShardSnapshotUpdate updateSnapshotState, - T updatedShard + Map existingShardSnapshotStatuses, + Supplier> newShardSnapshotStatusesSupplier, + ShardSnapshotUpdate shardSnapshotStatusUpdate, + T shardSnapshotId ) { - assert updateSnapshotState.snapshot.equals(entry.snapshot()); - final ShardSnapshotStatus existing = existingStates.get(updatedShard); + assert shardSnapshotStatusUpdate.snapshot.equals(entry.snapshot()); + + final ShardSnapshotStatus existing = existingShardSnapshotStatuses.get(shardSnapshotId); if (existing == null) { - logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", updatedShard, entry); + logger.error( + "Received shard snapshot status update [{}] but this shard is not tracked in [{}]", + shardSnapshotStatusUpdate, + entry + ); assert false : "This should never happen, should only receive updates for expected shards"; return; } @@ -3418,34 +3431,48 @@ private void executeShardSnapshotUpdate( return; } - final ShardSnapshotStatus updatedState; + final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get(); + final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId); + if (newShardSnapshotStatus != null && newShardSnapshotStatus.state().completed()) { + // An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished. + // For example, a delayed/retried PAUSED update should not override a completed shard snapshot. + iterator.remove(); + return; + } + + final ShardSnapshotStatus updatedShardSnapshotStatus; if (existing.state() == ShardState.ABORTED - && updateSnapshotState.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { + && shardSnapshotStatusUpdate.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { // concurrently pausing the shard snapshot due to node shutdown and aborting the snapshot - this shard is no longer // actively snapshotting but we don't want it to resume, so mark it as FAILED since it didn't complete - updatedState = new ShardSnapshotStatus( - updateSnapshotState.updatedState.nodeId(), + updatedShardSnapshotStatus = new ShardSnapshotStatus( + shardSnapshotStatusUpdate.updatedState.nodeId(), ShardState.FAILED, - updateSnapshotState.updatedState.generation(), + shardSnapshotStatusUpdate.updatedState.generation(), "snapshot aborted" ); } else { - updatedState = updateSnapshotState.updatedState; + updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState; } - if (updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { + if (updatedShardSnapshotStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { // leave subsequent entries for this shard alone until this one is unpaused iterator.remove(); } else { // All other shard updates leave the shard in a complete state, which means we should leave this update in the list so - // it can fall through to later entries and start any waiting shard snapshots: - assert updatedState.isActive() == false : updatedState; + // that it can fall through to later entries and start any waiting shard snapshots: + assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus; } - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot, updatedShard, updatedState.state()); + logger.trace( + "[{}] Updating shard [{}] with status [{}]", + shardSnapshotStatusUpdate.snapshot, + shardSnapshotId, + updatedShardSnapshotStatus.state() + ); changedCount++; - newStates.get().put(updatedShard, updatedState); - executedUpdates.add(updateSnapshotState); + newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus); + executedUpdates.add(shardSnapshotStatusUpdate); } private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index b95695bc68df2..8817bfcb004af 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -467,6 +467,108 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception { ); } + /** + * Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be + * ignored after the same shard snapshot has already been updated to SUCCESS (a completed state). On the other hand, a + * PAUSED_FOR_NODE_REMOVAL update follow by a SUCCESS update should be applied and result in SUCCESS. + */ + public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterSuccess() throws Exception { + final var repoName = "test-repo-name"; + final var snapshot1 = snapshot(repoName, "test-snap-1"); + final var snapshot2 = snapshot(repoName, "test-snap-2"); + final var indexName = "test-index"; + final var shardId = new ShardId(index(indexName), 0); + final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0); + final var nodeId = uuid(); + final var otherNodeId = uuid(); + + final SnapshotsInProgress.Entry firstSnapshotEntry = snapshotEntry( + snapshot1, + Collections.singletonMap(indexName, repositoryShardId.index()), + Map.of(shardId, initShardStatus(nodeId)) + ); + + final SnapshotsInProgress.Entry secondSnapshotEntry = snapshotEntry( + snapshot2, + Collections.singletonMap(indexName, repositoryShardId.index()), + Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED) + ); + + final ClusterState initialState = stateWithSnapshots( + ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create(nodeId)).localNodeId(nodeId).masterNodeId(nodeId).build()) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addShard(TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.STARTED)) + ) + .build() + ) + .build(), + repoName, + firstSnapshotEntry, + secondSnapshotEntry + ); + + final var updatedState = applyUpdates( + initialState, + // Snapshot 1 will apply SUCCESS and then ignore the PAUSED updated, and the final status will be SUCCESS. + new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + SnapshotsInProgress.ShardSnapshotStatus.success( + otherNodeId, + new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1) + ), + ActionTestUtils.assertNoFailureListener(t -> {}) + ), + new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + new SnapshotsInProgress.ShardSnapshotStatus( + nodeId, + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + firstSnapshotEntry.shards().get(shardId).generation() + ), + ActionTestUtils.assertNoFailureListener(t -> {}) + ), + // Snapshot 2 will apply PAUSED and then SUCCESS, and the final status will be SUCCESS. + new SnapshotsService.ShardSnapshotUpdate( + snapshot2, + shardId, + null, + new SnapshotsInProgress.ShardSnapshotStatus( + nodeId, + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + firstSnapshotEntry.shards().get(shardId).generation() + ), + ActionTestUtils.assertNoFailureListener(t -> {}) + ), + new SnapshotsService.ShardSnapshotUpdate( + snapshot2, + shardId, + null, + SnapshotsInProgress.ShardSnapshotStatus.success( + otherNodeId, + new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1) + ), + ActionTestUtils.assertNoFailureListener(t -> {}) + ) + ); + + assertEquals( + SnapshotsInProgress.ShardState.SUCCESS, + SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state() + ); + assertEquals( + SnapshotsInProgress.ShardState.SUCCESS, + SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state() + ); + } + public void testSnapshottingIndicesExcludesClones() { final String repoName = "test-repo"; final String indexName = "index"; @@ -570,10 +672,21 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa assertSame(applyUpdates(state, shardCompletion), state); } + /** + * Runs the shard snapshot updates through a ClusterStateTaskExecutor that executes the + * {@link SnapshotsService.SnapshotShardsUpdateContext}. + * + * @param state Original cluster state + * @param updates List of SnapshotTask tasks to apply to the cluster state + * @return An updated cluster state, or, if no change were made, the original given cluster state. + */ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception { return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> { final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState()); - final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {}); + final var context = new SnapshotsService.SnapshotShardsUpdateContext( + batchExecutionContext, + /* on completion handler */ (a, b, c) -> {} + ); final SnapshotsInProgress updated = context.computeUpdatedState(); context.completeWithUpdatedState(updated); if (existing == updated) { @@ -617,6 +730,9 @@ private static SnapshotsInProgress.Entry cloneEntry( .withClones(clones); } + /** + * Helper method to create a shard snapshot status with state {@link SnapshotsInProgress.ShardState#INIT}. + */ private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) { return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardGeneration.newGeneration(random())); } From d97d047a01aa5ac7c4eb9f045c0f7a26834f3225 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 10:03:32 -0400 Subject: [PATCH 02/10] Update docs/changelog/127250.yaml --- docs/changelog/127250.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/127250.yaml diff --git a/docs/changelog/127250.yaml b/docs/changelog/127250.yaml new file mode 100644 index 0000000000000..44a41645bd6c3 --- /dev/null +++ b/docs/changelog/127250.yaml @@ -0,0 +1,6 @@ +pr: 127250 +summary: Do not apply further shard snapshot status updates after shard snapshot is + complete +area: Snapshot/Restore +type: bug +issues: [] From 114411e0b637bf065d252d24336d499ef9064bb6 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 10:33:11 -0400 Subject: [PATCH 03/10] expect non-null builder entries --- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 3b391e545af66..a801a7067ccfe 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -3402,7 +3402,9 @@ private void startShardOperation( * Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering. * * @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus - * @param newShardSnapshotStatusesSupplier Supplies a builder mapping shard ID to updated ShardSnapshotStatus + * @param newShardSnapshotStatusesSupplier Supplies a builder. Initially maps shard ID to existing ShardSnapshotStatus. As shard + * snapshot status updates are applied, the original status entries are replaced with + * updated ones. * @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus * @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons) */ @@ -3433,7 +3435,8 @@ private void executeShardSnapshotUpdate( final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get(); final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId); - if (newShardSnapshotStatus != null && newShardSnapshotStatus.state().completed()) { + assert newShardSnapshotStatus != null; // builder is seeded with all the original statuses. + if (newShardSnapshotStatus.state().completed()) { // An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished. // For example, a delayed/retried PAUSED update should not override a completed shard snapshot. iterator.remove(); From fe54ad46a7c3666fcba308988366115baa9bcc4a Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 10:34:44 -0400 Subject: [PATCH 04/10] Update server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java Co-authored-by: David Turner --- .../java/org/elasticsearch/snapshots/SnapshotsServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 8817bfcb004af..8a9c74ab2e247 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -685,7 +685,7 @@ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.Sn final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState()); final var context = new SnapshotsService.SnapshotShardsUpdateContext( batchExecutionContext, - /* on completion handler */ (a, b, c) -> {} + /* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {} ); final SnapshotsInProgress updated = context.computeUpdatedState(); context.completeWithUpdatedState(updated); From f7332bd79441d90f9c4a2f9b57d6a7ec5298af3c Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 12:49:03 -0400 Subject: [PATCH 05/10] randomize testing --- .../snapshots/SnapshotsServiceTests.java | 164 +++++++++++------- 1 file changed, 103 insertions(+), 61 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 8a9c74ab2e247..b687d520875ca 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -467,28 +467,64 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception { ); } + private SnapshotsInProgress.ShardSnapshotStatus successShardSnapshotStatus( + String nodeId, + ShardId shardId, + SnapshotsInProgress.Entry entry + ) { + return SnapshotsInProgress.ShardSnapshotStatus.success( + nodeId, + new ShardSnapshotResult(entry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1) + ); + } + + private SnapshotsInProgress.ShardSnapshotStatus failedShardSnapshotStatus( + String nodeId, + ShardId shardId, + SnapshotsInProgress.Entry entry + ) { + return new SnapshotsInProgress.ShardSnapshotStatus( + nodeId, + SnapshotsInProgress.ShardState.FAILED, + entry.shards().get(shardId).generation(), + "test injected failure" + ); + } + + private SnapshotsInProgress.ShardSnapshotStatus pausedShardSnapshotStatus( + String nodeId, + ShardId shardId, + SnapshotsInProgress.Entry entry + ) { + return new SnapshotsInProgress.ShardSnapshotStatus( + nodeId, + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + entry.shards().get(shardId).generation() + ); + } + /** * Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be - * ignored after the same shard snapshot has already been updated to SUCCESS (a completed state). On the other hand, a - * PAUSED_FOR_NODE_REMOVAL update follow by a SUCCESS update should be applied and result in SUCCESS. + * ignored after the same shard snapshot has already been updated to a completed state. On the other hand, a PAUSED_FOR_NODE_REMOVAL + * update follow by a SUCCESS, or other completed state, update should be applied and result in SUCCESS. */ - public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterSuccess() throws Exception { + public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() throws Exception { final var repoName = "test-repo-name"; final var snapshot1 = snapshot(repoName, "test-snap-1"); final var snapshot2 = snapshot(repoName, "test-snap-2"); - final var indexName = "test-index"; + final var indexName = "test-index-name"; final var shardId = new ShardId(index(indexName), 0); final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0); - final var nodeId = uuid(); + final var originalNodeId = uuid(); final var otherNodeId = uuid(); - final SnapshotsInProgress.Entry firstSnapshotEntry = snapshotEntry( + final SnapshotsInProgress.Entry runningSnapshotEntry = snapshotEntry( snapshot1, Collections.singletonMap(indexName, repositoryShardId.index()), - Map.of(shardId, initShardStatus(nodeId)) + Map.of(shardId, initShardStatus(originalNodeId)) ); - final SnapshotsInProgress.Entry secondSnapshotEntry = snapshotEntry( + final SnapshotsInProgress.Entry queuedSnapshotEntry = snapshotEntry( snapshot2, Collections.singletonMap(indexName, repositoryShardId.index()), Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED) @@ -496,75 +532,81 @@ public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterSuccess() throw final ClusterState initialState = stateWithSnapshots( ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create(nodeId)).localNodeId(nodeId).masterNodeId(nodeId).build()) + .nodes( + DiscoveryNodes.builder() + .add(DiscoveryNodeUtils.create(originalNodeId)) + .localNodeId(originalNodeId) + .masterNodeId(originalNodeId) + .build() + ) .routingTable( RoutingTable.builder() .add( IndexRoutingTable.builder(shardId.getIndex()) - .addShard(TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.STARTED)) + .addShard(TestShardRouting.newShardRouting(shardId, originalNodeId, true, ShardRoutingState.STARTED)) ) .build() ) .build(), repoName, - firstSnapshotEntry, - secondSnapshotEntry + runningSnapshotEntry, + queuedSnapshotEntry + ); + + /** + * In this scenario, {@link originalNodeId} is the original shard owner that resends PAUSED, and {@link otherNodeId} is the new + * shard owner that completes the shard snapshot. The production code doesn't verify node ownership, but it's helpful for the test. + */ + + // Ultimately ignored statuses. + var pausedOnOriginalNodeStatus = pausedShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry); + var successfulOnOriginalNodeStatus = successShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry); + + // Ultimately applied statuses. + var successfulOnOtherNodeStatus = successShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry); + var failedOnOtherNodeStatus = failedShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry); + + var completedUpdateOnOtherNode = new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + // Success and failure are both completed shard snapshot states, so paused should be ignored when either is set. + randomBoolean() ? successfulOnOtherNodeStatus : failedOnOtherNodeStatus, + ActionTestUtils.assertNoFailureListener(t -> {}) + ); + var pausedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + pausedOnOriginalNodeStatus, + ActionTestUtils.assertNoFailureListener(t -> {}) + ); + var completedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate( + snapshot1, + shardId, + null, + successfulOnOriginalNodeStatus, + ActionTestUtils.assertNoFailureListener(t -> {}) ); + boolean random = randomBoolean(); final var updatedState = applyUpdates( initialState, - // Snapshot 1 will apply SUCCESS and then ignore the PAUSED updated, and the final status will be SUCCESS. - new SnapshotsService.ShardSnapshotUpdate( - snapshot1, - shardId, - null, - SnapshotsInProgress.ShardSnapshotStatus.success( - otherNodeId, - new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1) - ), - ActionTestUtils.assertNoFailureListener(t -> {}) - ), - new SnapshotsService.ShardSnapshotUpdate( - snapshot1, - shardId, - null, - new SnapshotsInProgress.ShardSnapshotStatus( - nodeId, - SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, - firstSnapshotEntry.shards().get(shardId).generation() - ), - ActionTestUtils.assertNoFailureListener(t -> {}) - ), - // Snapshot 2 will apply PAUSED and then SUCCESS, and the final status will be SUCCESS. - new SnapshotsService.ShardSnapshotUpdate( - snapshot2, - shardId, - null, - new SnapshotsInProgress.ShardSnapshotStatus( - nodeId, - SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, - firstSnapshotEntry.shards().get(shardId).generation() - ), - ActionTestUtils.assertNoFailureListener(t -> {}) - ), - new SnapshotsService.ShardSnapshotUpdate( - snapshot2, - shardId, - null, - SnapshotsInProgress.ShardSnapshotStatus.success( - otherNodeId, - new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1) - ), - ActionTestUtils.assertNoFailureListener(t -> {}) - ) + // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes after + // the completed update, paused should be ignored and the shard snapshot remains in a completed state. + random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, + random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode, + // Randomly add another update that will be ignored because the shard snapshot is complete. + // Note: the originalNodeId is used for this update, so we can verify afterward that the update is not applied. + randomBoolean() ? completedUpdateOnOriginalNode : pausedUpdateOnOriginalNode ); + assertTrue(SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state().completed()); + assertEquals(otherNodeId, SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).nodeId()); + + // Since the first snapshot completed, the second snapshot should be set to proceed with snapshotting the same shard. assertEquals( - SnapshotsInProgress.ShardState.SUCCESS, - SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state() - ); - assertEquals( - SnapshotsInProgress.ShardState.SUCCESS, + SnapshotsInProgress.ShardState.INIT, SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state() ); } From 0c9b6fb9af3bef5b967a976dbedd021b1b961c10 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 12:52:27 -0400 Subject: [PATCH 06/10] verify queued --- .../org/elasticsearch/snapshots/SnapshotsServiceTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index b687d520875ca..ad1b9b64ae248 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -553,6 +553,11 @@ public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() thr queuedSnapshotEntry ); + assertEquals( + SnapshotsInProgress.ShardState.QUEUED, + SnapshotsInProgress.get(initialState).snapshot(snapshot2).shards().get(shardId).state() + ); + /** * In this scenario, {@link originalNodeId} is the original shard owner that resends PAUSED, and {@link otherNodeId} is the new * shard owner that completes the shard snapshot. The production code doesn't verify node ownership, but it's helpful for the test. From ce684dc897f8713ff654a3432fdd21a087f1a977 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 13:46:05 -0400 Subject: [PATCH 07/10] randomize last entry --- .../elasticsearch/index/IndexVersions.java | 2 +- .../snapshots/SnapshotsServiceTests.java | 30 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexVersions.java b/server/src/main/java/org/elasticsearch/index/IndexVersions.java index 8af1caf8c20de..588018a27b2d0 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexVersions.java +++ b/server/src/main/java/org/elasticsearch/index/IndexVersions.java @@ -160,7 +160,7 @@ private static Version parseUnchecked(String version) { public static final IndexVersion SYNTHETIC_SOURCE_STORE_ARRAYS_NATIVELY_SCALED_FLOAT = def(9_020_0_00, Version.LUCENE_10_1_0); public static final IndexVersion USE_LUCENE101_POSTINGS_FORMAT = def(9_021_0_00, Version.LUCENE_10_1_0); public static final IndexVersion UPGRADE_TO_LUCENE_10_2_0 = def(9_022_00_0, Version.LUCENE_10_2_0); - public static final IndexVersion UPGRADE_TO_LUCENE_10_2_1 = def(9_023_00_0, Version.LUCENE_10_2_1); + // public static final IndexVersion UPGRADE_TO_LUCENE_10_2_1 = def(9_023_00_0, Version.LUCENE_10_2_1); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index ad1b9b64ae248..127f71495b33b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -49,6 +49,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertTrue; public class SnapshotsServiceTests extends ESTestCase { @@ -595,16 +596,25 @@ public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() thr ); boolean random = randomBoolean(); - final var updatedState = applyUpdates( - initialState, - // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes after - // the completed update, paused should be ignored and the shard snapshot remains in a completed state. - random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, - random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode, - // Randomly add another update that will be ignored because the shard snapshot is complete. - // Note: the originalNodeId is used for this update, so we can verify afterward that the update is not applied. - randomBoolean() ? completedUpdateOnOriginalNode : pausedUpdateOnOriginalNode - ); + ClusterState updatedState; + if (randomBoolean()) { + updatedState = applyUpdates( + initialState, + // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes after + // the completed update, paused should be ignored and the shard snapshot remains in a completed state. + random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, + random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode + ); + } else { + updatedState = applyUpdates( + initialState, + random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, + random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode, + // Randomly add another update that will be ignored because the shard snapshot is complete. + // Note: the originalNodeId is used for this update, so we can verify afterward that the update is not applied. + randomBoolean() ? completedUpdateOnOriginalNode : pausedUpdateOnOriginalNode + ); + } assertTrue(SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state().completed()); assertEquals(otherNodeId, SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).nodeId()); From e28879d842fe495b32f16e188cd582b6960b0647 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 23 Apr 2025 18:02:23 +0000 Subject: [PATCH 08/10] [CI] Auto commit changes from spotless --- .../org/elasticsearch/snapshots/SnapshotsServiceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 127f71495b33b..c610ac054ec39 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -600,7 +600,8 @@ public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() thr if (randomBoolean()) { updatedState = applyUpdates( initialState, - // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes after + // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes + // after // the completed update, paused should be ignored and the shard snapshot remains in a completed state. random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode From ba7dded974fd145979d3edd3ce9bcf50a794f64a Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 14:06:33 -0400 Subject: [PATCH 09/10] undo version comment out --- .../src/main/java/org/elasticsearch/index/IndexVersions.java | 4 ++-- .../org/elasticsearch/snapshots/SnapshotsServiceTests.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexVersions.java b/server/src/main/java/org/elasticsearch/index/IndexVersions.java index 588018a27b2d0..9e2524eeb865a 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexVersions.java +++ b/server/src/main/java/org/elasticsearch/index/IndexVersions.java @@ -160,10 +160,10 @@ private static Version parseUnchecked(String version) { public static final IndexVersion SYNTHETIC_SOURCE_STORE_ARRAYS_NATIVELY_SCALED_FLOAT = def(9_020_0_00, Version.LUCENE_10_1_0); public static final IndexVersion USE_LUCENE101_POSTINGS_FORMAT = def(9_021_0_00, Version.LUCENE_10_1_0); public static final IndexVersion UPGRADE_TO_LUCENE_10_2_0 = def(9_022_00_0, Version.LUCENE_10_2_0); - // public static final IndexVersion UPGRADE_TO_LUCENE_10_2_1 = def(9_023_00_0, Version.LUCENE_10_2_1); + public static final IndexVersion UPGRADE_TO_LUCENE_10_2_1 = def(9_023_00_0, Version.LUCENE_10_2_1); /* * STOP! READ THIS FIRST! No, really, - * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ + * ____ _____ ___ ____ _ ____ _____ _ ____ g_____ _ _ ___ ____ _____ ___ ____ ____ _____ _ * / ___|_ _/ _ \| _ \| | | _ \| ____| / \ | _ \ |_ _| | | |_ _/ ___| | ___|_ _| _ \/ ___|_ _| | * \___ \ | || | | | |_) | | | |_) | _| / _ \ | | | | | | | |_| || |\___ \ | |_ | || |_) \___ \ | | | | * ___) || || |_| | __/|_| | _ <| |___ / ___ \| |_| | | | | _ || | ___) | | _| | || _ < ___) || | |_| diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index c610ac054ec39..e51df3521ee9b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -601,8 +601,7 @@ public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() thr updatedState = applyUpdates( initialState, // Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes - // after - // the completed update, paused should be ignored and the shard snapshot remains in a completed state. + // after the completed update, paused should be ignored and the shard snapshot remains in a completed state. random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode, random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode ); From e02802d0561487a69fb160ce193860a536658c2d Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 23 Apr 2025 14:08:00 -0400 Subject: [PATCH 10/10] another typo... --- server/src/main/java/org/elasticsearch/index/IndexVersions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexVersions.java b/server/src/main/java/org/elasticsearch/index/IndexVersions.java index 9e2524eeb865a..8af1caf8c20de 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexVersions.java +++ b/server/src/main/java/org/elasticsearch/index/IndexVersions.java @@ -163,7 +163,7 @@ private static Version parseUnchecked(String version) { public static final IndexVersion UPGRADE_TO_LUCENE_10_2_1 = def(9_023_00_0, Version.LUCENE_10_2_1); /* * STOP! READ THIS FIRST! No, really, - * ____ _____ ___ ____ _ ____ _____ _ ____ g_____ _ _ ___ ____ _____ ___ ____ ____ _____ _ + * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ * / ___|_ _/ _ \| _ \| | | _ \| ____| / \ | _ \ |_ _| | | |_ _/ ___| | ___|_ _| _ \/ ___|_ _| | * \___ \ | || | | | |_) | | | |_) | _| / _ \ | | | | | | | |_| || |\___ \ | |_ | || |_) \___ \ | | | | * ___) || || |_| | __/|_| | _ <| |___ / ___ \| |_| | | | | _ || | ___) | | _| | || _ < ___) || | |_|