diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java index 72317a7220ec9..26e44ab0e6777 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java @@ -597,7 +597,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception { "SnapshotShutdownProgressTracker index shard snapshot messages", SnapshotShutdownProgressTracker.class.getCanonicalName(), Level.INFO, - "statusDescription='finished: master notification attempt complete'" + "*successfully sent shard snapshot state [PAUSED_FOR_NODE_REMOVAL] update to the master node*" ) ); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 90111c44fbd96..c8a415099f298 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -61,7 +61,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Supplier; +import java.util.function.Consumer; import static java.util.Collections.emptyMap; import static org.elasticsearch.core.Strings.format; @@ -332,7 +332,8 @@ private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean r ShardState.FAILED, shard.getValue().reason(), shard.getValue().generation(), - () -> null + // Shard snapshot never began, so there is no status object to update. + (outcomeInfoString) -> {} ); } } else { @@ -395,7 +396,6 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr for (final Map.Entry shardEntry : entry.shards().entrySet()) { final ShardId shardId = shardEntry.getKey(); final ShardSnapshotStatus masterShardSnapshotStatus = shardEntry.getValue(); - IndexShardSnapshotStatus indexShardSnapshotStatus = localShardSnapshots.get(shardId); if (masterShardSnapshotStatus.state() != ShardState.INIT) { // shard snapshot not currently scheduled by master @@ -416,10 +416,8 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr ShardState.PAUSED_FOR_NODE_REMOVAL, "paused", masterShardSnapshotStatus.generation(), - () -> { - indexShardSnapshotStatus.updateStatusDescription("finished: master notification attempt complete"); - return null; - } + // Shard snapshot never began, so there is no status object to update + (outcomeInfoString) -> {} ); } else { // shard snapshot currently running, mark for pause @@ -436,9 +434,8 @@ private Runnable newShardSnapshotTask( final IndexVersion entryVersion, final long entryStartTime ) { - Supplier postMasterNotificationAction = () -> { - snapshotStatus.updateStatusDescription("finished: master notification attempt complete"); - return null; + Consumer postMasterNotificationAction = (outcomeInfoString) -> { + snapshotStatus.updateStatusDescription("Data node shard snapshot finished. Remote master update outcome: " + outcomeInfoString); }; // Listener that runs on completion of the shard snapshot: it will notify the master node of success or failure. @@ -679,6 +676,8 @@ private void syncShardStatsOnNewMaster(List entries) if (masterShard != null && masterShard.state().completed() == false) { final IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy(); final Stage stage = indexShardSnapshotStatus.getStage(); + final String statusDescription = indexShardSnapshotStatus.getStatusDescription(); + final int maxStatusAppend = 1000; // Master knows about the shard and thinks it has not completed if (stage == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done @@ -692,7 +691,20 @@ private void syncShardStatsOnNewMaster(List entries) snapshot.snapshot(), shardId, localShard.getValue().getShardSnapshotResult(), - () -> null + (outcomeInfoString) -> localShard.getValue() + .updateStatusDescription( + Strings.format( + """ + Data node already successfully finished shard snapshot, but a new master needed to be + notified. New remote master notification outcome: [%s]. The prior shard snapshot status + description was [%s] + """, + outcomeInfoString, + statusDescription.length() < maxStatusAppend + ? statusDescription + : statusDescription.substring(0, maxStatusAppend) + ) + ) ); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed @@ -708,7 +720,21 @@ private void syncShardStatsOnNewMaster(List entries) ShardState.FAILED, indexShardSnapshotStatus.getFailure(), localShard.getValue().generation(), - () -> null + // Update the original statusDescription with the latest remote master call outcome, but include the old + // response. This will allow us to see when/whether the information reached the previous and current master. + (outcomeInfoString) -> localShard.getValue() + .updateStatusDescription( + Strings.format( + """ + Data node already failed shard snapshot, but a new master needed to be notified. New remote + master notification outcome: [%s]. The prior shard snapshot status description was [%s] + """, + outcomeInfoString, + statusDescription.length() < maxStatusAppend + ? statusDescription + : statusDescription.substring(0, maxStatusAppend) + ) + ) ); } else if (stage == Stage.PAUSED) { // but we think the shard has paused - we need to make new master know that @@ -722,7 +748,19 @@ private void syncShardStatsOnNewMaster(List entries) ShardState.PAUSED_FOR_NODE_REMOVAL, indexShardSnapshotStatus.getFailure(), localShard.getValue().generation(), - () -> null + (outcomeInfoString) -> localShard.getValue() + .updateStatusDescription( + Strings.format( + """ + Data node already paused shard snapshot, but a new master needed to be notified. New remote + master notification outcome: [%s]. The prior shard snapshot status description was [%s] + """, + outcomeInfoString, + statusDescription.length() < maxStatusAppend + ? statusDescription + : statusDescription.substring(0, maxStatusAppend) + ) + ) ); } } @@ -739,7 +777,7 @@ private void notifySuccessfulSnapshotShard( final Snapshot snapshot, final ShardId shardId, ShardSnapshotResult shardSnapshotResult, - Supplier postMasterNotificationAction + Consumer postMasterNotificationAction ) { assert shardSnapshotResult != null; assert shardSnapshotResult.getGeneration() != null; @@ -760,7 +798,7 @@ private void notifyUnsuccessfulSnapshotShard( final ShardState shardState, final String failure, final ShardGeneration generation, - Supplier postMasterNotificationAction + Consumer postMasterNotificationAction ) { assert shardState == ShardState.FAILED || shardState == ShardState.PAUSED_FOR_NODE_REMOVAL : shardState; sendSnapshotShardUpdate( @@ -784,29 +822,32 @@ private void sendSnapshotShardUpdate( final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status, - Supplier postMasterNotificationAction + Consumer postMasterNotificationAction ) { + snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId); ActionListener updateResultListener = new ActionListener<>() { @Override public void onResponse(Void aVoid) { + snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId); + postMasterNotificationAction.accept( + Strings.format("successfully sent shard snapshot state [%s] update to the master node", status.state()) + ); logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status); } @Override public void onFailure(Exception e) { + snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId); + postMasterNotificationAction.accept( + Strings.format("exception trying to send shard snapshot state [%s] update to the master node [%s]", status.state(), e) + ); logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e); } }; - snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId); - var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> { - snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId); - postMasterNotificationAction.get(); - }); - remoteFailedRequestDeduplicator.executeOnce( new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status), - releaseTrackerRequestRunsBeforeResultListener, + updateResultListener, (req, reqListener) -> transportService.sendRequest( transportService.getLocalNode(), SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,