Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127250.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127250
summary: Do not apply further shard snapshot status updates after shard snapshot is
complete
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -3398,16 +3398,31 @@ private <T> void startShardOperation(
startedCount++;
}

/**
* Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering.
*
* @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus
* @param newShardSnapshotStatusesSupplier Supplies a builder. Initially maps shard ID to existing ShardSnapshotStatus. As shard
* snapshot status updates are applied, the original status entries are replaced with
* updated ones.
* @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus
* @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons)
*/
private <T> void executeShardSnapshotUpdate(
Map<T, ShardSnapshotStatus> existingStates,
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newStates,
ShardSnapshotUpdate updateSnapshotState,
T updatedShard
Map<T, ShardSnapshotStatus> existingShardSnapshotStatuses,
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newShardSnapshotStatusesSupplier,
ShardSnapshotUpdate shardSnapshotStatusUpdate,
T shardSnapshotId
) {
assert updateSnapshotState.snapshot.equals(entry.snapshot());
final ShardSnapshotStatus existing = existingStates.get(updatedShard);
assert shardSnapshotStatusUpdate.snapshot.equals(entry.snapshot());

final ShardSnapshotStatus existing = existingShardSnapshotStatuses.get(shardSnapshotId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", updatedShard, entry);
logger.error(
"Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
shardSnapshotStatusUpdate,
entry
);
assert false : "This should never happen, should only receive updates for expected shards";
return;
}
Expand All @@ -3418,34 +3433,49 @@ private <T> void executeShardSnapshotUpdate(
return;
}

final ShardSnapshotStatus updatedState;
final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get();
final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId);
assert newShardSnapshotStatus != null; // builder is seeded with all the original statuses.
if (newShardSnapshotStatus.state().completed()) {
// An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished.
// For example, a delayed/retried PAUSED update should not override a completed shard snapshot.
iterator.remove();
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual bug fix


final ShardSnapshotStatus updatedShardSnapshotStatus;
if (existing.state() == ShardState.ABORTED
&& updateSnapshotState.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
&& shardSnapshotStatusUpdate.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
// concurrently pausing the shard snapshot due to node shutdown and aborting the snapshot - this shard is no longer
// actively snapshotting but we don't want it to resume, so mark it as FAILED since it didn't complete
updatedState = new ShardSnapshotStatus(
updateSnapshotState.updatedState.nodeId(),
updatedShardSnapshotStatus = new ShardSnapshotStatus(
shardSnapshotStatusUpdate.updatedState.nodeId(),
ShardState.FAILED,
updateSnapshotState.updatedState.generation(),
shardSnapshotStatusUpdate.updatedState.generation(),
"snapshot aborted"
);
} else {
updatedState = updateSnapshotState.updatedState;
updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState;
}

if (updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
if (updatedShardSnapshotStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
// leave subsequent entries for this shard alone until this one is unpaused
iterator.remove();
} else {
// All other shard updates leave the shard in a complete state, which means we should leave this update in the list so
// it can fall through to later entries and start any waiting shard snapshots:
assert updatedState.isActive() == false : updatedState;
// that it can fall through to later entries and start any waiting shard snapshots:
assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus;
}

logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot, updatedShard, updatedState.state());
logger.trace(
"[{}] Updating shard [{}] with status [{}]",
shardSnapshotStatusUpdate.snapshot,
shardSnapshotId,
updatedShardSnapshotStatus.state()
);
changedCount++;
newStates.get().put(updatedShard, updatedState);
executedUpdates.add(updateSnapshotState);
newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus);
executedUpdates.add(shardSnapshotStatusUpdate);
}

private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,155 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception {
);
}

private SnapshotsInProgress.ShardSnapshotStatus successShardSnapshotStatus(
String nodeId,
ShardId shardId,
SnapshotsInProgress.Entry entry
) {
return SnapshotsInProgress.ShardSnapshotStatus.success(
nodeId,
new ShardSnapshotResult(entry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1)
);
}

private SnapshotsInProgress.ShardSnapshotStatus failedShardSnapshotStatus(
String nodeId,
ShardId shardId,
SnapshotsInProgress.Entry entry
) {
return new SnapshotsInProgress.ShardSnapshotStatus(
nodeId,
SnapshotsInProgress.ShardState.FAILED,
entry.shards().get(shardId).generation(),
"test injected failure"
);
}

private SnapshotsInProgress.ShardSnapshotStatus pausedShardSnapshotStatus(
String nodeId,
ShardId shardId,
SnapshotsInProgress.Entry entry
) {
return new SnapshotsInProgress.ShardSnapshotStatus(
nodeId,
SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL,
entry.shards().get(shardId).generation()
);
}

/**
* Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be
* ignored after the same shard snapshot has already been updated to a completed state. On the other hand, a PAUSED_FOR_NODE_REMOVAL
* update follow by a SUCCESS, or other completed state, update should be applied and result in SUCCESS.
*/
public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterCompleted() throws Exception {
final var repoName = "test-repo-name";
final var snapshot1 = snapshot(repoName, "test-snap-1");
final var snapshot2 = snapshot(repoName, "test-snap-2");
final var indexName = "test-index-name";
final var shardId = new ShardId(index(indexName), 0);
final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0);
final var originalNodeId = uuid();
final var otherNodeId = uuid();

final SnapshotsInProgress.Entry runningSnapshotEntry = snapshotEntry(
snapshot1,
Collections.singletonMap(indexName, repositoryShardId.index()),
Map.of(shardId, initShardStatus(originalNodeId))
);

final SnapshotsInProgress.Entry queuedSnapshotEntry = snapshotEntry(
snapshot2,
Collections.singletonMap(indexName, repositoryShardId.index()),
Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)
);

final ClusterState initialState = stateWithSnapshots(
ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(
DiscoveryNodes.builder()
.add(DiscoveryNodeUtils.create(originalNodeId))
.localNodeId(originalNodeId)
.masterNodeId(originalNodeId)
.build()
)
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(shardId.getIndex())
.addShard(TestShardRouting.newShardRouting(shardId, originalNodeId, true, ShardRoutingState.STARTED))
)
.build()
)
.build(),
repoName,
runningSnapshotEntry,
queuedSnapshotEntry
);

assertEquals(
SnapshotsInProgress.ShardState.QUEUED,
SnapshotsInProgress.get(initialState).snapshot(snapshot2).shards().get(shardId).state()
);

/**
* In this scenario, {@link originalNodeId} is the original shard owner that resends PAUSED, and {@link otherNodeId} is the new
* shard owner that completes the shard snapshot. The production code doesn't verify node ownership, but it's helpful for the test.
*/

// Ultimately ignored statuses.
var pausedOnOriginalNodeStatus = pausedShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry);
var successfulOnOriginalNodeStatus = successShardSnapshotStatus(originalNodeId, shardId, runningSnapshotEntry);

// Ultimately applied statuses.
var successfulOnOtherNodeStatus = successShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry);
var failedOnOtherNodeStatus = failedShardSnapshotStatus(otherNodeId, shardId, runningSnapshotEntry);

var completedUpdateOnOtherNode = new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
// Success and failure are both completed shard snapshot states, so paused should be ignored when either is set.
randomBoolean() ? successfulOnOtherNodeStatus : failedOnOtherNodeStatus,
ActionTestUtils.assertNoFailureListener(t -> {})
);
var pausedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
pausedOnOriginalNodeStatus,
ActionTestUtils.assertNoFailureListener(t -> {})
);
var completedUpdateOnOriginalNode = new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
successfulOnOriginalNodeStatus,
ActionTestUtils.assertNoFailureListener(t -> {})
);

boolean random = randomBoolean();
final var updatedState = applyUpdates(
initialState,
// Randomize the order of completed and paused updates but make sure that there's one of each. If the paused update comes after
// the completed update, paused should be ignored and the shard snapshot remains in a completed state.
random ? completedUpdateOnOtherNode : pausedUpdateOnOriginalNode,
random ? pausedUpdateOnOriginalNode : completedUpdateOnOtherNode,
// Randomly add another update that will be ignored because the shard snapshot is complete.
// Note: the originalNodeId is used for this update, so we can verify afterward that the update is not applied.
randomBoolean() ? completedUpdateOnOriginalNode : pausedUpdateOnOriginalNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I'd like us to only sometimes have this third update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I've if-else'd it, for lack of a better notion.

);

assertTrue(SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state().completed());
assertEquals(otherNodeId, SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).nodeId());

// Since the first snapshot completed, the second snapshot should be set to proceed with snapshotting the same shard.
assertEquals(
SnapshotsInProgress.ShardState.INIT,
SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state()
);
}

public void testSnapshottingIndicesExcludesClones() {
final String repoName = "test-repo";
final String indexName = "index";
Expand Down Expand Up @@ -570,10 +719,21 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa
assertSame(applyUpdates(state, shardCompletion), state);
}

/**
* Runs the shard snapshot updates through a ClusterStateTaskExecutor that executes the
* {@link SnapshotsService.SnapshotShardsUpdateContext}.
*
* @param state Original cluster state
* @param updates List of SnapshotTask tasks to apply to the cluster state
* @return An updated cluster state, or, if no change were made, the original given cluster state.
*/
private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception {
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> {
final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState());
final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {});
final var context = new SnapshotsService.SnapshotShardsUpdateContext(
batchExecutionContext,
/* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {}
);
final SnapshotsInProgress updated = context.computeUpdatedState();
context.completeWithUpdatedState(updated);
if (existing == updated) {
Expand Down Expand Up @@ -617,6 +777,9 @@ private static SnapshotsInProgress.Entry cloneEntry(
.withClones(clones);
}

/**
* Helper method to create a shard snapshot status with state {@link SnapshotsInProgress.ShardState#INIT}.
*/
private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardGeneration.newGeneration(random()));
}
Expand Down
Loading