Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"SnapshotShutdownProgressTracker index shard snapshot messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"statusDescription='finished: master notification attempt complete'"
"*successfully sent shard snapshot state [PAUSED_FOR_NODE_REMOVAL] update to the master node*"
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.function.Consumer;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -332,7 +332,8 @@ private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean r
ShardState.FAILED,
shard.getValue().reason(),
shard.getValue().generation(),
() -> null
// Shard snapshot never began, so there is no status object to update.
(outcomeInfoString) -> {}
);
}
} else {
Expand Down Expand Up @@ -395,7 +396,6 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr
for (final Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
final ShardId shardId = shardEntry.getKey();
final ShardSnapshotStatus masterShardSnapshotStatus = shardEntry.getValue();
IndexShardSnapshotStatus indexShardSnapshotStatus = localShardSnapshots.get(shardId);

if (masterShardSnapshotStatus.state() != ShardState.INIT) {
// shard snapshot not currently scheduled by master
Expand All @@ -416,10 +416,8 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr
ShardState.PAUSED_FOR_NODE_REMOVAL,
"paused",
masterShardSnapshotStatus.generation(),
() -> {
indexShardSnapshotStatus.updateStatusDescription("finished: master notification attempt complete");
return null;
}
// Shard snapshot never began, so there is no status object to update
(outcomeInfoString) -> {}
);
} else {
// shard snapshot currently running, mark for pause
Expand All @@ -436,9 +434,8 @@ private Runnable newShardSnapshotTask(
final IndexVersion entryVersion,
final long entryStartTime
) {
Supplier<Void> postMasterNotificationAction = () -> {
snapshotStatus.updateStatusDescription("finished: master notification attempt complete");
return null;
Consumer<String> postMasterNotificationAction = (outcomeInfoString) -> {
snapshotStatus.updateStatusDescription("Data node shard snapshot finished. Remote master update outcome: " + outcomeInfoString);
};

// Listener that runs on completion of the shard snapshot: it will notify the master node of success or failure.
Expand Down Expand Up @@ -679,6 +676,8 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
if (masterShard != null && masterShard.state().completed() == false) {
final IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy();
final Stage stage = indexShardSnapshotStatus.getStage();
final String statusDescription = indexShardSnapshotStatus.getStatusDescription();
final int maxStatusAppend = 1000;
// Master knows about the shard and thinks it has not completed
if (stage == Stage.DONE) {
// but we think the shard is done - we need to make new master know that the shard is done
Expand All @@ -692,7 +691,20 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
snapshot.snapshot(),
shardId,
localShard.getValue().getShardSnapshotResult(),
() -> null
(outcomeInfoString) -> localShard.getValue()
.updateStatusDescription(
Strings.format(
"""
Data node already successfully finished shard snapshot, but a new master needed to be
notified. New remote master notification outcome: [%s]. The prior shard snapshot status
description was [%s]
""",
outcomeInfoString,
statusDescription.length() < maxStatusAppend
? statusDescription
: statusDescription.substring(0, maxStatusAppend)
)
)
);
} else if (stage == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
Expand All @@ -708,7 +720,21 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
ShardState.FAILED,
indexShardSnapshotStatus.getFailure(),
localShard.getValue().generation(),
() -> null
// Update the original statusDescription with the latest remote master call outcome, but include the old
// response. This will allow us to see when/whether the information reached the previous and current master.
(outcomeInfoString) -> localShard.getValue()
.updateStatusDescription(
Strings.format(
"""
Data node already failed shard snapshot, but a new master needed to be notified. New remote
master notification outcome: [%s]. The prior shard snapshot status description was [%s]
""",
outcomeInfoString,
statusDescription.length() < maxStatusAppend
? statusDescription
: statusDescription.substring(0, maxStatusAppend)
)
)
);
} else if (stage == Stage.PAUSED) {
// but we think the shard has paused - we need to make new master know that
Expand All @@ -722,7 +748,19 @@ private void syncShardStatsOnNewMaster(List<SnapshotsInProgress.Entry> entries)
ShardState.PAUSED_FOR_NODE_REMOVAL,
indexShardSnapshotStatus.getFailure(),
localShard.getValue().generation(),
() -> null
(outcomeInfoString) -> localShard.getValue()
.updateStatusDescription(
Strings.format(
"""
Data node already paused shard snapshot, but a new master needed to be notified. New remote
master notification outcome: [%s]. The prior shard snapshot status description was [%s]
""",
outcomeInfoString,
statusDescription.length() < maxStatusAppend
? statusDescription
: statusDescription.substring(0, maxStatusAppend)
)
)
);
}
}
Expand All @@ -739,7 +777,7 @@ private void notifySuccessfulSnapshotShard(
final Snapshot snapshot,
final ShardId shardId,
ShardSnapshotResult shardSnapshotResult,
Supplier<Void> postMasterNotificationAction
Consumer<String> postMasterNotificationAction
) {
assert shardSnapshotResult != null;
assert shardSnapshotResult.getGeneration() != null;
Expand All @@ -760,7 +798,7 @@ private void notifyUnsuccessfulSnapshotShard(
final ShardState shardState,
final String failure,
final ShardGeneration generation,
Supplier<Void> postMasterNotificationAction
Consumer<String> postMasterNotificationAction
) {
assert shardState == ShardState.FAILED || shardState == ShardState.PAUSED_FOR_NODE_REMOVAL : shardState;
sendSnapshotShardUpdate(
Expand All @@ -784,29 +822,32 @@ private void sendSnapshotShardUpdate(
final Snapshot snapshot,
final ShardId shardId,
final ShardSnapshotStatus status,
Supplier<Void> postMasterNotificationAction
Consumer<String> postMasterNotificationAction
) {
snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId);
ActionListener<Void> updateResultListener = new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
postMasterNotificationAction.accept(
Strings.format("successfully sent shard snapshot state [%s] update to the master node", status.state())
);
logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
}

@Override
public void onFailure(Exception e) {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
postMasterNotificationAction.accept(
Strings.format("exception trying to send shard snapshot state [%s] update to the master node [%s]", status.state(), e)
);
logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e);
}
};

snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId);
var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
postMasterNotificationAction.get();
});

remoteFailedRequestDeduplicator.executeOnce(
new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status),
releaseTrackerRequestRunsBeforeResultListener,
updateResultListener,
(req, reqListener) -> transportService.sendRequest(
transportService.getLocalNode(),
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
Expand Down