Skip to content

Commit 1dfb70e

Browse files
Do not apply further shard snapshot status updates after shard snapshot is complete (#127250)
Relates ES-11375
1 parent 08552f1 commit 1dfb70e

File tree

3 files changed

+229
-20
lines changed

3 files changed

+229
-20
lines changed

docs/changelog/127250.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 127250
2+
summary: Do not apply further shard snapshot status updates after shard snapshot is
3+
complete
4+
area: Snapshot/Restore
5+
type: bug
6+
issues: []

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3398,16 +3398,31 @@ private <T> void startShardOperation(
33983398
startedCount++;
33993399
}
34003400

3401+
/**
3402+
* Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering.
3403+
*
3404+
* @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus
3405+
* @param newShardSnapshotStatusesSupplier Supplies a builder. Initially maps shard ID to existing ShardSnapshotStatus. As shard
3406+
* snapshot status updates are applied, the original status entries are replaced with
3407+
* updated ones.
3408+
* @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus
3409+
* @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons)
3410+
*/
34013411
private <T> void executeShardSnapshotUpdate(
3402-
Map<T, ShardSnapshotStatus> existingStates,
3403-
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newStates,
3404-
ShardSnapshotUpdate updateSnapshotState,
3405-
T updatedShard
3412+
Map<T, ShardSnapshotStatus> existingShardSnapshotStatuses,
3413+
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newShardSnapshotStatusesSupplier,
3414+
ShardSnapshotUpdate shardSnapshotStatusUpdate,
3415+
T shardSnapshotId
34063416
) {
3407-
assert updateSnapshotState.snapshot.equals(entry.snapshot());
3408-
final ShardSnapshotStatus existing = existingStates.get(updatedShard);
3417+
assert shardSnapshotStatusUpdate.snapshot.equals(entry.snapshot());
3418+
3419+
final ShardSnapshotStatus existing = existingShardSnapshotStatuses.get(shardSnapshotId);
34093420
if (existing == null) {
3410-
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", updatedShard, entry);
3421+
logger.error(
3422+
"Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
3423+
shardSnapshotStatusUpdate,
3424+
entry
3425+
);
34113426
assert false : "This should never happen, should only receive updates for expected shards";
34123427
return;
34133428
}
@@ -3418,34 +3433,49 @@ private <T> void executeShardSnapshotUpdate(
34183433
return;
34193434
}
34203435

3421-
final ShardSnapshotStatus updatedState;
3436+
final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get();
3437+
final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId);
3438+
assert newShardSnapshotStatus != null; // builder is seeded with all the original statuses.
3439+
if (newShardSnapshotStatus.state().completed()) {
3440+
// An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished.
3441+
// For example, a delayed/retried PAUSED update should not override a completed shard snapshot.
3442+
iterator.remove();
3443+
return;
3444+
}
3445+
3446+
final ShardSnapshotStatus updatedShardSnapshotStatus;
34223447
if (existing.state() == ShardState.ABORTED
3423-
&& updateSnapshotState.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
3448+
&& shardSnapshotStatusUpdate.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
34243449
// concurrently pausing the shard snapshot due to node shutdown and aborting the snapshot - this shard is no longer
34253450
// actively snapshotting but we don't want it to resume, so mark it as FAILED since it didn't complete
3426-
updatedState = new ShardSnapshotStatus(
3427-
updateSnapshotState.updatedState.nodeId(),
3451+
updatedShardSnapshotStatus = new ShardSnapshotStatus(
3452+
shardSnapshotStatusUpdate.updatedState.nodeId(),
34283453
ShardState.FAILED,
3429-
updateSnapshotState.updatedState.generation(),
3454+
shardSnapshotStatusUpdate.updatedState.generation(),
34303455
"snapshot aborted"
34313456
);
34323457
} else {
3433-
updatedState = updateSnapshotState.updatedState;
3458+
updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState;
34343459
}
34353460

3436-
if (updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
3461+
if (updatedShardSnapshotStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
34373462
// leave subsequent entries for this shard alone until this one is unpaused
34383463
iterator.remove();
34393464
} else {
34403465
// All other shard updates leave the shard in a complete state, which means we should leave this update in the list so
3441-
// it can fall through to later entries and start any waiting shard snapshots:
3442-
assert updatedState.isActive() == false : updatedState;
3466+
// that it can fall through to later entries and start any waiting shard snapshots:
3467+
assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus;
34433468
}
34443469

3445-
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot, updatedShard, updatedState.state());
3470+
logger.trace(
3471+
"[{}] Updating shard [{}] with status [{}]",
3472+
shardSnapshotStatusUpdate.snapshot,
3473+
shardSnapshotId,
3474+
updatedShardSnapshotStatus.state()
3475+
);
34463476
changedCount++;
3447-
newStates.get().put(updatedShard, updatedState);
3448-
executedUpdates.add(updateSnapshotState);
3477+
newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus);
3478+
executedUpdates.add(shardSnapshotStatusUpdate);
34493479
}
34503480

34513481
private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) {

server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java

Lines changed: 174 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
5050
import static org.hamcrest.Matchers.empty;
5151
import static org.hamcrest.Matchers.is;
52+
import static org.junit.Assert.assertTrue;
5253

5354
public class SnapshotsServiceTests extends ESTestCase {
5455

@@ -467,6 +468,164 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception {
467468
);
468469
}
469470

471+
private SnapshotsInProgress.ShardSnapshotStatus successShardSnapshotStatus(
472+
String nodeId,
473+
ShardId shardId,
474+
SnapshotsInProgress.Entry entry
475+
) {
476+
return SnapshotsInProgress.ShardSnapshotStatus.success(
477+
nodeId,
478+
new ShardSnapshotResult(entry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1)
479+
);
480+
}
481+
482+
private SnapshotsInProgress.ShardSnapshotStatus failedShardSnapshotStatus(
483+
String nodeId,
484+
ShardId shardId,
485+
SnapshotsInProgress.Entry entry
486+
) {
487+
return new SnapshotsInProgress.ShardSnapshotStatus(
488+
nodeId,
489+
SnapshotsInProgress.ShardState.FAILED,
490+
entry.shards().get(shardId).generation(),
491+
"test injected failure"
492+
);
493+
}
494+
495+
private SnapshotsInProgress.ShardSnapshotStatus pausedShardSnapshotStatus(
496+
String nodeId,
497+
ShardId shardId,
498+
SnapshotsInProgress.Entry entry
499+
) {
500+
return new SnapshotsInProgress.ShardSnapshotStatus(
501+
nodeId,
502+
SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL,
503+
entry.shards().get(shardId).generation()
504+
);
505+
}
506+
507+
/**
508+
* Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be
509+
* ignored after the same shard snapshot has already been updated to a completed state. On the other hand, a PAUSED_FOR_NODE_REMOVAL
510+
* update follow by a SUCCESS, or other completed state, update should be applied and result in SUCCESS.
511+
*/
512+
public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() throws Exception {
513+
final var repoName = "test-repo-name";
514+
final var snapshot1 = snapshot(repoName, "test-snap-1");
515+
final var snapshot2 = snapshot(repoName, "test-snap-2");
516+
final var indexName = "test-index-name";
517+
final var shardId = new ShardId(index(indexName), 0);
518+
final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0);
519+
final var originalNodeId = uuid();
520+
final var otherNodeId = uuid();
521+
522+
final SnapshotsInProgress.Entry runningSnapshotEntry = snapshotEntry(
523+
snapshot1,
524+
Collections.singletonMap(indexName, repositoryShardId.index()),
525+
Map.of(shardId, initShardStatus(originalNodeId))
526+
);
527+
528+
final SnapshotsInProgress.Entry queuedSnapshotEntry = snapshotEntry(
529+
snapshot2,
530+
Collections.singletonMap(indexName, repositoryShardId.index()),
531+
Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)
532+
);
533+
534+
final ClusterState initialState = stateWithSnapshots(
535+
ClusterState.builder(ClusterState.EMPTY_STATE)
536+
.nodes(
537+
DiscoveryNodes.builder()
538+
.add(DiscoveryNodeUtils.create(originalNodeId))
539+
.localNodeId(originalNodeId)
540+
.masterNodeId(originalNodeId)
541+
.build()
542+
)
543+
.routingTable(
544+
RoutingTable.builder()
545+
.add(
546+
IndexRoutingTable.builder(shardId.getIndex())
547+
.addShard(TestShardRouting.newShardRouting(shardId, originalNodeId, true, ShardRoutingState.STARTED))
548+
)
549+
.build()
550+
)
551+
.build(),
552+
repoName,
553+
runningSnapshotEntry,
554+
queuedSnapshotEntry
555+
);
556+
557+
assertEquals(
558+
SnapshotsInProgress.ShardState.QUEUED,
559+
SnapshotsInProgress.get(initialState).snapshot(snapshot2).shards().get(shardId).state()
560+
);
561+
562+
/**
563+
* In this scenario, {@link originalNodeId} is the original shard owner that resends PAUSED, and {@link otherNodeId} is the new
564+
* shard owner that completes the shard snapshot. The production code doesn't verify node ownership, but it's helpful for the test.
565+
*/
566+
567+
// Ultimately ignored statuses.
568+
var pausedOnOriginalNodeStatus = pausedShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry);
569+
var successfulOnOriginalNodeStatus = successShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry);
570+
571+
// Ultimately applied statuses.
572+
var successfulOnOtherNodeStatus = successShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry);
573+
var failedOnOtherNodeStatus = failedShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry);
574+
575+
var completedUpdateOnOtherNode = new SnapshotsService.ShardSnapshotUpdate(
576+
snapshot1,
577+
shardId,
578+
null,
579+
// Success and failure are both completed shard snapshot states, so paused should be ignored when either is set.
580+
randomBoolean() ? successfulOnOtherNodeStatus : failedOnOtherNodeStatus,
581+
ActionTestUtils.assertNoFailureListener(t -> {})
582+
);
583+
var pausedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate(
584+
snapshot1,
585+
shardId,
586+
null,
587+
pausedOnOriginalNodeStatus,
588+
ActionTestUtils.assertNoFailureListener(t -> {})
589+
);
590+
var completedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate(
591+
snapshot1,
592+
shardId,
593+
null,
594+
successfulOnOriginalNodeStatus,
595+
ActionTestUtils.assertNoFailureListener(t -> {})
596+
);
597+
598+
boolean random = randomBoolean();
599+
ClusterState updatedState;
600+
if (randomBoolean()) {
601+
updatedState = applyUpdates(
602+
initialState,
603+
// Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes
604+
// after the completed update, paused should be ignored and the shard snapshot remains in a completed state.
605+
random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode,
606+
random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode
607+
);
608+
} else {
609+
updatedState = applyUpdates(
610+
initialState,
611+
random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode,
612+
random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode,
613+
// Randomly add another update that will be ignored because the shard snapshot is complete.
614+
// Note: the originalNodeId is used for this update, so we can verify afterward that the update is not applied.
615+
randomBoolean() ? completedUpdateOnOriginalNode : pausedUpdateOnOriginalNode
616+
);
617+
}
618+
619+
assertTrue(SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state().completed());
620+
assertEquals(otherNodeId, SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).nodeId());
621+
622+
// Since the first snapshot completed, the second snapshot should be set to proceed with snapshotting the same shard.
623+
assertEquals(
624+
SnapshotsInProgress.ShardState.INIT,
625+
SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state()
626+
);
627+
}
628+
470629
public void testSnapshottingIndicesExcludesClones() {
471630
final String repoName = "test-repo";
472631
final String indexName = "index";
@@ -570,10 +729,21 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa
570729
assertSame(applyUpdates(state, shardCompletion), state);
571730
}
572731

732+
/**
733+
* Runs the shard snapshot updates through a ClusterStateTaskExecutor that executes the
734+
* {@link SnapshotsService.SnapshotShardsUpdateContext}.
735+
*
736+
* @param state Original cluster state
737+
* @param updates List of SnapshotTask tasks to apply to the cluster state
738+
* @return An updated cluster state, or, if no change were made, the original given cluster state.
739+
*/
573740
private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception {
574741
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> {
575742
final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState());
576-
final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {});
743+
final var context = new SnapshotsService.SnapshotShardsUpdateContext(
744+
batchExecutionContext,
745+
/* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {}
746+
);
577747
final SnapshotsInProgress updated = context.computeUpdatedState();
578748
context.completeWithUpdatedState(updated);
579749
if (existing == updated) {
@@ -617,6 +787,9 @@ private static SnapshotsInProgress.Entry cloneEntry(
617787
.withClones(clones);
618788
}
619789

790+
/**
791+
* Helper method to create a shard snapshot status with state {@link SnapshotsInProgress.ShardState#INIT}.
792+
*/
620793
private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
621794
return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardGeneration.newGeneration(random()));
622795
}

0 commit comments

Comments
 (0)