Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127250.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127250
summary: Do not apply further shard snapshot status updates after shard snapshot is
complete
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -3398,16 +3398,29 @@ private <T> void startShardOperation(
startedCount++;
}

/**
* Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering.
*
* @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus
* @param newShardSnapshotStatusesSupplier Supplies a builder mapping shard ID to updated ShardSnapshotStatus
* @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus
* @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons)
*/
private <T> void executeShardSnapshotUpdate(
Map<T, ShardSnapshotStatus> existingStates,
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newStates,
ShardSnapshotUpdate updateSnapshotState,
T updatedShard
Map<T, ShardSnapshotStatus> existingShardSnapshotStatuses,
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newShardSnapshotStatusesSupplier,
ShardSnapshotUpdate shardSnapshotStatusUpdate,
T shardSnapshotId
) {
assert updateSnapshotState.snapshot.equals(entry.snapshot());
final ShardSnapshotStatus existing = existingStates.get(updatedShard);
assert shardSnapshotStatusUpdate.snapshot.equals(entry.snapshot());

final ShardSnapshotStatus existing = existingShardSnapshotStatuses.get(shardSnapshotId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", updatedShard, entry);
logger.error(
"Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
shardSnapshotStatusUpdate,
entry
);
assert false : "This should never happen, should only receive updates for expected shards";
return;
}
Expand All @@ -3418,34 +3431,48 @@ private <T> void executeShardSnapshotUpdate(
return;
}

final ShardSnapshotStatus updatedState;
final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get();
final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId);
if (newShardSnapshotStatus != null && newShardSnapshotStatus.state().completed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should always be non-null - the builder starts with a copy of existingShardSnapshotStatuses and just overwrites the bits that get updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, that took an unexpected turn. Thanks, updated.

// An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished.
// For example, a delayed/retried PAUSED update should not override a completed shard snapshot.
iterator.remove();
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual bug fix


final ShardSnapshotStatus updatedShardSnapshotStatus;
if (existing.state() == ShardState.ABORTED
&& updateSnapshotState.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
&& shardSnapshotStatusUpdate.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
// concurrently pausing the shard snapshot due to node shutdown and aborting the snapshot - this shard is no longer
// actively snapshotting but we don't want it to resume, so mark it as FAILED since it didn't complete
updatedState = new ShardSnapshotStatus(
updateSnapshotState.updatedState.nodeId(),
updatedShardSnapshotStatus = new ShardSnapshotStatus(
shardSnapshotStatusUpdate.updatedState.nodeId(),
ShardState.FAILED,
updateSnapshotState.updatedState.generation(),
shardSnapshotStatusUpdate.updatedState.generation(),
"snapshot aborted"
);
} else {
updatedState = updateSnapshotState.updatedState;
updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState;
}

if (updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
if (updatedShardSnapshotStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
// leave subsequent entries for this shard alone until this one is unpaused
iterator.remove();
} else {
// All other shard updates leave the shard in a complete state, which means we should leave this update in the list so
// it can fall through to later entries and start any waiting shard snapshots:
assert updatedState.isActive() == false : updatedState;
// that it can fall through to later entries and start any waiting shard snapshots:
assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus;
}

logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot, updatedShard, updatedState.state());
logger.trace(
"[{}] Updating shard [{}] with status [{}]",
shardSnapshotStatusUpdate.snapshot,
shardSnapshotId,
updatedShardSnapshotStatus.state()
);
changedCount++;
newStates.get().put(updatedShard, updatedState);
executedUpdates.add(updateSnapshotState);
newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus);
executedUpdates.add(shardSnapshotStatusUpdate);
}

private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,108 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception {
);
}

/**
* Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be
* ignored after the same shard snapshot has already been updated to SUCCESS (a completed state). On the other hand, a
* PAUSED_FOR_NODE_REMOVAL update follow by a SUCCESS update should be applied and result in SUCCESS.
*/
public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterSuccess() throws Exception {
final var repoName = "test-repo-name";
final var snapshot1 = snapshot(repoName, "test-snap-1");
final var snapshot2 = snapshot(repoName, "test-snap-2");
final var indexName = "test-index";
final var shardId = new ShardId(index(indexName), 0);
final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0);
final var nodeId = uuid();
final var otherNodeId = uuid();

final SnapshotsInProgress.Entry firstSnapshotEntry = snapshotEntry(
snapshot1,
Collections.singletonMap(indexName, repositoryShardId.index()),
Map.of(shardId, initShardStatus(nodeId))
);

final SnapshotsInProgress.Entry secondSnapshotEntry = snapshotEntry(
snapshot2,
Collections.singletonMap(indexName, repositoryShardId.index()),
Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)
);

final ClusterState initialState = stateWithSnapshots(
ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create(nodeId)).localNodeId(nodeId).masterNodeId(nodeId).build())
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(shardId.getIndex())
.addShard(TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.STARTED))
)
.build()
)
.build(),
repoName,
firstSnapshotEntry,
secondSnapshotEntry
);

final var updatedState = applyUpdates(
initialState,
// Snapshot 1 will apply SUCCESS and then ignore the PAUSED updated, and the final status will be SUCCESS.
new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
SnapshotsInProgress.ShardSnapshotStatus.success(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also apply to PAUSED_FOR_NODE_REMOVAL either side of a FAILED right? Can we randomly choose between SUCCESS and FAILED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done 👍

otherNodeId,
new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1)
),
ActionTestUtils.assertNoFailureListener(t -> {})
),
new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
new SnapshotsInProgress.ShardSnapshotStatus(
nodeId,
SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL,
firstSnapshotEntry.shards().get(shardId).generation()
),
ActionTestUtils.assertNoFailureListener(t -> {})
),
// Snapshot 2 will apply PAUSED and then SUCCESS, and the final status will be SUCCESS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not realistic as a single batch of updates for a single shard - snapshot2 won't start snapshotting this shard until snapshot1 has applied a state update which completes it.

Can we instead randomize the order of the updates? Or randomly add a PAUSED_FOR_NODE_REMOVAL before and/or after the SUCCESS one and verify that the outcome is SUCCESS in every case (and the next snapshot moves from QUEUED to INIT)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've randomized the order of PAUSED and COMPLETE updates, and randomly add a third PAUSED/COMPLETE update (with a different nodeId to verify it's ignored). And checked the QUEUED -> INIT transition.

new SnapshotsService.ShardSnapshotUpdate(
snapshot2,
shardId,
null,
new SnapshotsInProgress.ShardSnapshotStatus(
nodeId,
SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL,
firstSnapshotEntry.shards().get(shardId).generation()
),
ActionTestUtils.assertNoFailureListener(t -> {})
),
new SnapshotsService.ShardSnapshotUpdate(
snapshot2,
shardId,
null,
SnapshotsInProgress.ShardSnapshotStatus.success(
otherNodeId,
new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1)
),
ActionTestUtils.assertNoFailureListener(t -> {})
)
);

assertEquals(
SnapshotsInProgress.ShardState.SUCCESS,
SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state()
);
assertEquals(
SnapshotsInProgress.ShardState.SUCCESS,
SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state()
);
}

public void testSnapshottingIndicesExcludesClones() {
final String repoName = "test-repo";
final String indexName = "index";
Expand Down Expand Up @@ -570,10 +672,21 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa
assertSame(applyUpdates(state, shardCompletion), state);
}

/**
* Runs the shard snapshot updates through a ClusterStateTaskExecutor that executes the
* {@link SnapshotsService.SnapshotShardsUpdateContext}.
*
* @param state Original cluster state
* @param updates List of SnapshotTask tasks to apply to the cluster state
* @return An updated cluster state, or, if no change were made, the original given cluster state.
*/
private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception {
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> {
final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState());
final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {});
final var context = new SnapshotsService.SnapshotShardsUpdateContext(
batchExecutionContext,
/* on completion handler */ (a, b, c) -> {}
);
final SnapshotsInProgress updated = context.computeUpdatedState();
context.completeWithUpdatedState(updated);
if (existing == updated) {
Expand Down Expand Up @@ -617,6 +730,9 @@ private static SnapshotsInProgress.Entry cloneEntry(
.withClones(clones);
}

/**
* Helper method to create a shard snapshot status with state {@link SnapshotsInProgress.ShardState#INIT}.
*/
private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardGeneration.newGeneration(random()));
}
Expand Down
Loading