Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3398,16 +3398,29 @@ private <T> void startShardOperation(
startedCount++;
}

/**
* Applies a shard snapshot status update to an updated statuses builder, after a few sanity checks / filtering.
*
* @param existingShardSnapshotStatuses Maps shard ID to ShardSnapshotStatus
* @param newShardSnapshotStatusesSupplier Supplies a builder mapping shard ID to updated ShardSnapshotStatus
* @param shardSnapshotStatusUpdate The update to apply to build a new updated ShardSnapshotStatus
* @param shardSnapshotId The shard snapshot ID of the shard being updated (for type reasons)
*/
private <T> void executeShardSnapshotUpdate(
Map<T, ShardSnapshotStatus> existingStates,
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newStates,
ShardSnapshotUpdate updateSnapshotState,
T updatedShard
Map<T, ShardSnapshotStatus> existingShardSnapshotStatuses,
Supplier<ImmutableOpenMap.Builder<T, ShardSnapshotStatus>> newShardSnapshotStatusesSupplier,
ShardSnapshotUpdate shardSnapshotStatusUpdate,
T shardSnapshotId
) {
assert updateSnapshotState.snapshot.equals(entry.snapshot());
final ShardSnapshotStatus existing = existingStates.get(updatedShard);
assert shardSnapshotStatusUpdate.snapshot.equals(entry.snapshot());

final ShardSnapshotStatus existing = existingShardSnapshotStatuses.get(shardSnapshotId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", updatedShard, entry);
logger.error(
"Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
shardSnapshotStatusUpdate,
entry
);
assert false : "This should never happen, should only receive updates for expected shards";
return;
}
Expand All @@ -3418,34 +3431,48 @@ private <T> void executeShardSnapshotUpdate(
return;
}

final ShardSnapshotStatus updatedState;
final var newShardSnapshotStatusesBuilder = newShardSnapshotStatusesSupplier.get();
final var newShardSnapshotStatus = newShardSnapshotStatusesBuilder.get(shardSnapshotId);
if (newShardSnapshotStatus != null && newShardSnapshotStatus.state().completed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should always be non-null - the builder starts with a copy of existingShardSnapshotStatuses and just overwrites the bits that get updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, that took an unexpected turn. Thanks, updated.

// An out-of-order status update arrived. It should not be applied because the shard snapshot is already finished.
// For example, a delayed/retried PAUSED update should not override a completed shard snapshot.
iterator.remove();
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual bug fix


final ShardSnapshotStatus updatedShardSnapshotStatus;
if (existing.state() == ShardState.ABORTED
&& updateSnapshotState.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
&& shardSnapshotStatusUpdate.updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
// concurrently pausing the shard snapshot due to node shutdown and aborting the snapshot - this shard is no longer
// actively snapshotting but we don't want it to resume, so mark it as FAILED since it didn't complete
updatedState = new ShardSnapshotStatus(
updateSnapshotState.updatedState.nodeId(),
updatedShardSnapshotStatus = new ShardSnapshotStatus(
shardSnapshotStatusUpdate.updatedState.nodeId(),
ShardState.FAILED,
updateSnapshotState.updatedState.generation(),
shardSnapshotStatusUpdate.updatedState.generation(),
"snapshot aborted"
);
} else {
updatedState = updateSnapshotState.updatedState;
updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState;
}

if (updatedState.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
if (updatedShardSnapshotStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) {
// leave subsequent entries for this shard alone until this one is unpaused
iterator.remove();
} else {
// All other shard updates leave the shard in a complete state, which means we should leave this update in the list so
// it can fall through to later entries and start any waiting shard snapshots:
assert updatedState.isActive() == false : updatedState;
// that it can fall through to later entries and start any waiting shard snapshots:
assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus;
}

logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot, updatedShard, updatedState.state());
logger.trace(
"[{}] Updating shard [{}] with status [{}]",
shardSnapshotStatusUpdate.snapshot,
shardSnapshotId,
updatedShardSnapshotStatus.state()
);
changedCount++;
newStates.get().put(updatedShard, updatedState);
executedUpdates.add(updateSnapshotState);
newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus);
executedUpdates.add(shardSnapshotStatusUpdate);
}

private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,108 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception {
);
}

/**
* Tests that, within the same cluster state batched update execution, a shard snapshot status update of PAUSED_FOR_NODE_REMOVAL will be
* ignored after the same shard snapshot has already been updated to SUCCESS (a completed state). On the other hand, a
* PAUSED_FOR_NODE_REMOVAL update follow by a SUCCESS update should be applied and result in SUCCESS.
*/
public void testBatchedShardSnapshotUpdatesCannotApplyPausedAfterSuccess() throws Exception {
final var repoName = "test-repo-name";
final var snapshot1 = snapshot(repoName, "test-snap-1");
final var snapshot2 = snapshot(repoName, "test-snap-2");
final var indexName = "test-index";
final var shardId = new ShardId(index(indexName), 0);
final var repositoryShardId = new RepositoryShardId(indexId(indexName), 0);
final var nodeId = uuid();
final var otherNodeId = uuid();

final SnapshotsInProgress.Entry firstSnapshotEntry = snapshotEntry(
snapshot1,
Collections.singletonMap(indexName, repositoryShardId.index()),
Map.of(shardId, initShardStatus(nodeId))
);

final SnapshotsInProgress.Entry secondSnapshotEntry = snapshotEntry(
snapshot2,
Collections.singletonMap(indexName, repositoryShardId.index()),
Map.of(shardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)
);

final ClusterState initialState = stateWithSnapshots(
ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create(nodeId)).localNodeId(nodeId).masterNodeId(nodeId).build())
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(shardId.getIndex())
.addShard(TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.STARTED))
)
.build()
)
.build(),
repoName,
firstSnapshotEntry,
secondSnapshotEntry
);

final var updatedState = applyUpdates(
initialState,
// Snapshot 1 will apply SUCCESS and then ignore the PAUSED updated, and the final status will be SUCCESS.
new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
SnapshotsInProgress.ShardSnapshotStatus.success(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also apply to PAUSED_FOR_NODE_REMOVAL either side of a FAILED right? Can we randomly choose between SUCCESS and FAILED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done 👍

otherNodeId,
new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1)
),
ActionTestUtils.assertNoFailureListener(t -> {})
),
new SnapshotsService.ShardSnapshotUpdate(
snapshot1,
shardId,
null,
new SnapshotsInProgress.ShardSnapshotStatus(
nodeId,
SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL,
firstSnapshotEntry.shards().get(shardId).generation()
),
ActionTestUtils.assertNoFailureListener(t -> {})
),
// Snapshot 2 will apply PAUSED and then SUCCESS, and the final status will be SUCCESS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not realistic as a single batch of updates for a single shard - snapshot2 won't start snapshotting this shard until snapshot1 has applied a state update which completes it.

Can we instead randomize the order of the updates? Or randomly add a PAUSED_FOR_NODE_REMOVAL before and/or after the SUCCESS one and verify that the outcome is SUCCESS in every case (and the next snapshot moves from QUEUED to INIT)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've randomized the order of PAUSED and COMPLETE updates, and randomly add a third PAUSED/COMPLETE update (with a different nodeId to verify it's ignored). And checked the QUEUED -> INIT transition.

new SnapshotsService.ShardSnapshotUpdate(
snapshot2,
shardId,
null,
new SnapshotsInProgress.ShardSnapshotStatus(
nodeId,
SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL,
firstSnapshotEntry.shards().get(shardId).generation()
),
ActionTestUtils.assertNoFailureListener(t -> {})
),
new SnapshotsService.ShardSnapshotUpdate(
snapshot2,
shardId,
null,
SnapshotsInProgress.ShardSnapshotStatus.success(
otherNodeId,
new ShardSnapshotResult(firstSnapshotEntry.shards().get(shardId).generation(), ByteSizeValue.ofBytes(1L), 1)
),
ActionTestUtils.assertNoFailureListener(t -> {})
)
);

assertEquals(
SnapshotsInProgress.ShardState.SUCCESS,
SnapshotsInProgress.get(updatedState).snapshot(snapshot1).shards().get(shardId).state()
);
assertEquals(
SnapshotsInProgress.ShardState.SUCCESS,
SnapshotsInProgress.get(updatedState).snapshot(snapshot2).shards().get(shardId).state()
);
}

public void testSnapshottingIndicesExcludesClones() {
final String repoName = "test-repo";
final String indexName = "index";
Expand Down Expand Up @@ -570,10 +672,21 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa
assertSame(applyUpdates(state, shardCompletion), state);
}

/**
* Runs the shard snapshot updates through a ClusterStateTaskExecutor that executes the
* {@link SnapshotsService.SnapshotShardsUpdateContext}.
*
* @param state Original cluster state
* @param updates List of SnapshotTask tasks to apply to the cluster state
* @return An updated cluster state, or, if no change were made, the original given cluster state.
*/
private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception {
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> {
final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState());
final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {});
final var context = new SnapshotsService.SnapshotShardsUpdateContext(
batchExecutionContext,
/* on completion handler */ (a, b, c) -> {}
);
final SnapshotsInProgress updated = context.computeUpdatedState();
context.completeWithUpdatedState(updated);
if (existing == updated) {
Expand Down Expand Up @@ -617,6 +730,9 @@ private static SnapshotsInProgress.Entry cloneEntry(
.withClones(clones);
}

/**
* Helper method to create a shard snapshot status with state {@link SnapshotsInProgress.ShardState#INIT}.
*/
private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardGeneration.newGeneration(random()));
}
Expand Down