Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.function.Consumer;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -332,7 +332,8 @@ private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean r
ShardState.FAILED,
shard.getValue().reason(),
shard.getValue().generation(),
() -> null
// Shard snapshot never began, so there is no status object to update.
(outcomeInfoString) -> {}
);
}
} else {
Expand Down Expand Up @@ -395,7 +396,6 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr
for (final Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
final ShardId shardId = shardEntry.getKey();
final ShardSnapshotStatus masterShardSnapshotStatus = shardEntry.getValue();
IndexShardSnapshotStatus indexShardSnapshotStatus = localShardSnapshots.get(shardId);

if (masterShardSnapshotStatus.state() != ShardState.INIT) {
// shard snapshot not currently scheduled by master
Expand All @@ -416,10 +416,8 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr
ShardState.PAUSED_FOR_NODE_REMOVAL,
"paused",
masterShardSnapshotStatus.generation(),
() -> {
indexShardSnapshotStatus.updateStatusDescription("finished: master notification attempt complete");
return null;
}
// Shard snapshot never began, so there is no status object to update
(outcomeInfoString) -> {}
);
} else {
// shard snapshot currently running, mark for pause
Expand All @@ -436,9 +434,8 @@ private Runnable newShardSnapshotTask(
final IndexVersion entryVersion,
final long entryStartTime
) {
Supplier<Void> postMasterNotificationAction = () -> {
snapshotStatus.updateStatusDescription("finished: master notification attempt complete");
return null;
Consumer<String> postMasterNotificationAction = (outcomeInfoString) -> {
snapshotStatus.updateStatusDescription("Data node shard snapshot finished. Remote master update outcome: " + outcomeInfoString);
};

// Listener that runs on completion of the shard snapshot: it will notify the master node of success or failure.
Expand Down Expand Up @@ -692,7 +689,15 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
snapshot.snapshot(),
shardId,
localShard.getValue().getShardSnapshotResult(),
() -> null
(outcomeInfoString) -> localShard.getValue().updateStatusDescription(
Strings.format("""
Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could theoretically grow without bound -- appending the previous status string. I figure it's unlikely to happen, since we'd stop once a remote call successfully updates the cluster state. We'd have to keep rolling over the master, and keep failing to get majority, or persistence in the blob store for serverless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah could we impose a limit here just in case? Simply truncating the string if it exceeds 1000 characters or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated 👍 I used substring: a bit wordy, let me know if there's some other/better standard way of doing it.

Data node already successfully finished shard snapshot, but a new master needed to be notified. New
remote master notification outcome: [%s]. The prior shard snapshot status description was [%s]
""",
outcomeInfoString,
indexShardSnapshotStatus.getStatusDescription()
)
)
);
} else if (stage == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
Expand All @@ -708,7 +713,17 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
ShardState.FAILED,
indexShardSnapshotStatus.getFailure(),
localShard.getValue().generation(),
() -> null
// Update the original statusDescription with the latest remote master call outcome, but include the old
// response. This will allow us to see when/whether the information reached the previous and current master.
(outcomeInfoString) -> localShard.getValue().updateStatusDescription(
Strings.format("""
Data node already failed shard snapshot, but a new master needed to be notified. New remote master
notification outcome: [%s]. The prior shard snapshot status description was [%s]
""",
outcomeInfoString,
indexShardSnapshotStatus.getStatusDescription()
)
)
);
} else if (stage == Stage.PAUSED) {
// but we think the shard has paused - we need to make new master know that
Expand All @@ -722,7 +737,16 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
ShardState.PAUSED_FOR_NODE_REMOVAL,
indexShardSnapshotStatus.getFailure(),
localShard.getValue().generation(),
() -> null
(outcomeInfoString) -> localShard.getValue()
.updateStatusDescription(
Strings.format("""
Data node already paused shard snapshot, but a new master needed to be notified. New remote
master notification outcome: [%s]. The prior shard snapshot status description was [%s]
""",
outcomeInfoString,
indexShardSnapshotStatus.getStatusDescription()
)
)
);
}
}
Expand All @@ -739,7 +763,7 @@ private void notifySuccessfulSnapshotShard(
final Snapshot snapshot,
final ShardId shardId,
ShardSnapshotResult shardSnapshotResult,
Supplier<Void> postMasterNotificationAction
Consumer<String> postMasterNotificationAction
) {
assert shardSnapshotResult != null;
assert shardSnapshotResult.getGeneration() != null;
Expand All @@ -760,7 +784,7 @@ private void notifyUnsuccessfulSnapshotShard(
final ShardState shardState,
final String failure,
final ShardGeneration generation,
Supplier<Void> postMasterNotificationAction
Consumer<String> postMasterNotificationAction
) {
assert shardState == ShardState.FAILED || shardState == ShardState.PAUSED_FOR_NODE_REMOVAL : shardState;
sendSnapshotShardUpdate(
Expand All @@ -784,29 +808,31 @@ private void sendSnapshotShardUpdate(
final Snapshot snapshot,
final ShardId shardId,
final ShardSnapshotStatus status,
Supplier<Void> postMasterNotificationAction
Consumer<String> postMasterNotificationAction
) {
ActionListener<Void> updateResultListener = new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
postMasterNotificationAction.accept(
Strings.format("successfully sent shard snapshot state [%s] update to the master node", status.state())
);
logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
}

@Override
public void onFailure(Exception e) {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
postMasterNotificationAction.accept(
Strings.format("exception trying to send shard snapshot state [%s] update to the master node [%s]", status.state(), e)
);
logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e);
}
};

snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId);
var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
postMasterNotificationAction.get();
});

remoteFailedRequestDeduplicator.executeOnce(
new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status),
releaseTrackerRequestRunsBeforeResultListener,
updateResultListener,
(req, reqListener) -> transportService.sendRequest(
transportService.getLocalNode(),
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
Expand Down