diff --git a/server/src/main/java/org/elasticsearch/snapshots/RegisteredPolicySnapshots.java b/server/src/main/java/org/elasticsearch/snapshots/RegisteredPolicySnapshots.java index 811bc6872fb79..e133882d03c6f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RegisteredPolicySnapshots.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RegisteredPolicySnapshots.java @@ -44,7 +44,7 @@ * cluster state as custom metadata. When a snapshot is started by SLM, it is added to this set. Upon completion, * is it removed. If a snapshot does not record its failure in SnapshotLifecycleStats, likely due to a master shutdown, * it will not be removed from the registered set. A subsequent snapshot will then find that a registered snapshot - * is no longer running and will infer that it failed, updating SnapshotLifecycleStats accordingly. + * is no longer running and update SnapshotLifecycleStats based on the status of the snapshot. */ public class RegisteredPolicySnapshots implements Metadata.ProjectCustom { diff --git a/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index e5171a7c51650..6930e39b2ab8c 100644 --- a/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -72,6 +73,7 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase private static final String NEVER_EXECUTE_CRON_SCHEDULE = "* * * 31 FEB ? *"; static final String REPO = "my-repo"; + List masterNodeNames = null; List dataNodeNames = null; @Override @@ -85,7 +87,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { @Before public void ensureClusterNodes() { logger.info("--> starting enough nodes to ensure we have enough to safely stop for tests"); - internalCluster().startMasterOnlyNodes(2); + masterNodeNames = internalCluster().startMasterOnlyNodes(2); dataNodeNames = internalCluster().startDataOnlyNodes(2); ensureGreen(); } @@ -329,6 +331,185 @@ public void testRetentionWithMultipleRepositories() throws Exception { testUnsuccessfulSnapshotRetention(randomBoolean()); } + // Test that SLM stats and lastSuccess/lastFailure are correctly updated with master shutdown + public void testSLMWithMasterShutdown() throws Exception { + final String indexName = "test"; + final String policyName = "test-policy"; + int clusterSize = masterNodeNames.size() + dataNodeNames.size(); + indexRandomDocs(indexName, 20); + createRepository(REPO, "mock"); + + createSnapshotPolicy( + policyName, + "snap", + NEVER_EXECUTE_CRON_SCHEDULE, + REPO, + indexName, + true, + false, + new SnapshotRetentionConfiguration(TimeValue.ZERO, null, null) + ); + + // block snapshot from completing + blockMasterFromFinalizingSnapshotOnIndexFile(REPO); + + // first SLM execution + final String snapshotName = executePolicy(policyName); + final String initialMaster = internalCluster().getMasterName(); + waitForBlock(initialMaster, REPO); + + // restart master + internalCluster().restartNode(initialMaster); + ensureStableCluster(clusterSize); + awaitNoMoreRunningOperations(); + + // ensure snapshot is completed successfully after master failover + assertBusy(() -> { + final SnapshotInfo snapshotInfo; + try { + GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO) + .setSnapshots(snapshotName) + .get(); + snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + } catch (SnapshotMissingException sme) { + throw new AssertionError(sme); + } + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + }, 30L, TimeUnit.SECONDS); + assertSnapshotSuccessful(snapshotName); + + // the SLM policy metadata has not been updated due to master shutdown + assertBusy(() -> { + SnapshotLifecyclePolicyItem policy = client().execute( + GetSnapshotLifecycleAction.INSTANCE, + new GetSnapshotLifecycleAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policyName) + ).get().getPolicies().getFirst(); + assertNull(policy.getLastSuccess()); + assertNull(policy.getLastFailure()); + assertEquals(0, policy.getPolicyStats().getSnapshotFailedCount()); + assertEquals(0, policy.getPolicyStats().getSnapshotTakenCount()); + }); + + // 2nd SLM execution, it should pick up the last missing stats + String snapshotSecond = executePolicy(policyName); + + awaitNoMoreRunningOperations(); + assertSnapshotSuccessful(snapshotSecond); + + // stats should have 2 successful snapshots, 1 from the new snapshot and 1 from previous success + assertBusy(() -> { + SnapshotLifecyclePolicyItem policy = client().execute( + GetSnapshotLifecycleAction.INSTANCE, + new GetSnapshotLifecycleAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policyName) + ).get().getPolicies().getFirst(); + assertNull(policy.getLastFailure()); + assertNotNull(policy.getLastSuccess()); + assertEquals(snapshotSecond, policy.getLastSuccess().getSnapshotName()); + assertEquals(0, policy.getPolicyStats().getSnapshotFailedCount()); + assertEquals(2, policy.getPolicyStats().getSnapshotTakenCount()); + }); + } + + public void testSLMWithMasterShutdownAndDeletedSnapshot() throws Exception { + final String indexName = "test"; + final String policyName = "test-policy"; + int clusterSize = masterNodeNames.size() + dataNodeNames.size(); + indexRandomDocs(indexName, 20); + createRepository(REPO, "mock"); + + createSnapshotPolicy( + policyName, + "snap", + NEVER_EXECUTE_CRON_SCHEDULE, + REPO, + indexName, + true, + false, + new SnapshotRetentionConfiguration(TimeValue.ZERO, null, null) + ); + + // block snapshot from completing + blockMasterFromFinalizingSnapshotOnIndexFile(REPO); + + // first SLM execution + final String snapshotName = executePolicy(policyName); + final String initialMaster = internalCluster().getMasterName(); + waitForBlock(initialMaster, REPO); + + // restart master + internalCluster().restartNode(initialMaster); + ensureStableCluster(clusterSize); + awaitNoMoreRunningOperations(); + + // ensure snapshot is completed successfully after master failover + assertBusy(() -> { + final SnapshotInfo snapshotInfo; + try { + GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO) + .setSnapshots(snapshotName) + .get(); + snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + } catch (SnapshotMissingException sme) { + throw new AssertionError(sme); + } + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + }, 30L, TimeUnit.SECONDS); + assertSnapshotSuccessful(snapshotName); + + // the SLM policy metadata has not been updated due to master shutdown + assertBusy(() -> { + SnapshotLifecyclePolicyItem policy = client().execute( + GetSnapshotLifecycleAction.INSTANCE, + new GetSnapshotLifecycleAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policyName) + ).get().getPolicies().getFirst(); + assertNull(policy.getLastSuccess()); + assertNull(policy.getLastFailure()); + assertEquals(0, policy.getPolicyStats().getSnapshotFailedCount()); + assertEquals(0, policy.getPolicyStats().getSnapshotTakenCount()); + }); + + // delete the snapshot, simulate missing snapshot from repo + assertBusy(() -> { + AcknowledgedResponse response = clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, REPO, snapshotName).get(); + assertTrue(response.isAcknowledged()); + }); + + // 2nd SLM execution, it should pick up the last missing stats + String snapshotSecond = executePolicy(policyName); + + awaitNoMoreRunningOperations(); + assertSnapshotSuccessful(snapshotSecond); + + // stats should have 1 successful and 1 failed snapshot, the deleted snapshot is inferred failure + assertBusy(() -> { + SnapshotLifecyclePolicyItem policy = client().execute( + GetSnapshotLifecycleAction.INSTANCE, + new GetSnapshotLifecycleAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policyName) + ).get().getPolicies().getFirst(); + assertNotNull(policy.getLastSuccess()); + assertEquals(snapshotSecond, policy.getLastSuccess().getSnapshotName()); + assertNotNull(policy.getLastFailure()); + assertEquals(snapshotName, policy.getLastFailure().getSnapshotName()); + assertEquals(1, policy.getPolicyStats().getSnapshotFailedCount()); + assertEquals(1, policy.getPolicyStats().getSnapshotTakenCount()); + }); + } + + private void assertSnapshotSuccessful(String snapshotName) throws Exception { + assertBusy(() -> { + final SnapshotInfo snapshotInfo; + try { + GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO) + .setSnapshots(snapshotName) + .get(); + snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + } catch (SnapshotMissingException sme) { + throw new AssertionError(sme); + } + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + }); + } + private void testUnsuccessfulSnapshotRetention(boolean partialSuccess) throws Exception { final String indexName = "test-idx"; final String policyId = "test-policy"; diff --git a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java index 053116d37e223..4f551a0ee8f21 100644 --- a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java +++ b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java @@ -232,6 +232,17 @@ public static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) { return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion(); } + /** + * Gets the policy name from a job id, which is expected to be in the format + * {@code -}. This method extracts the policy id by + * removing the version part (the last part after the last dash). + */ + public static String getPolicyId(String jobId) { + int lastDashIndex = jobId.lastIndexOf('-'); + assert lastDashIndex != -1 : "Invalid job id format: " + jobId; + return jobId.substring(0, lastDashIndex); + } + /** * Cancel all scheduled snapshot jobs */ diff --git a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java index d886b5d6fb7c1..73ecd53f637a9 100644 --- a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java +++ b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java @@ -13,9 +13,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.elasticsearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; @@ -35,6 +38,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicySecurityClient; import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord; import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata; import org.elasticsearch.xpack.core.slm.SnapshotLifecycleStats; import org.elasticsearch.xpack.slm.history.SnapshotHistoryItem; @@ -43,6 +47,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,9 +55,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentSLMMode; +import static org.elasticsearch.xpack.slm.SnapshotLifecycleService.getJobId; public class SnapshotLifecycleTask implements SchedulerEngine.Listener { @@ -75,11 +83,28 @@ public SnapshotLifecycleTask( this.historyStore = historyStore; } + static List findCompletedRegisteredSnapshotIds(ProjectState projectState, String policyId) { + Set runningSnapshots = currentlyRunningSnapshots(projectState.cluster()); + + RegisteredPolicySnapshots registeredSnapshots = projectState.metadata() + .custom(RegisteredPolicySnapshots.TYPE, RegisteredPolicySnapshots.EMPTY); + + return registeredSnapshots.getSnapshots() + .stream() + // look for snapshots of this SLM policy, leave the rest to the policy that owns it + .filter(policySnapshot -> policySnapshot.getPolicy().equals(policyId)) + // look for snapshots that are no longer running + .filter(policySnapshot -> runningSnapshots.contains(policySnapshot.getSnapshotId()) == false) + .map(policySnapshot -> policySnapshot.getSnapshotId().getName()) + .toList(); + } + @Override public void triggered(SchedulerEngine.Event event) { logger.debug("snapshot lifecycle policy task triggered from job [{}]", event.jobName()); - ProjectMetadata projectMetadata = clusterService.state().getMetadata().getProject(projectId); - final Optional snapshotName = maybeTakeSnapshot(projectMetadata, event.jobName(), client, clusterService, historyStore); + ProjectState projectState = clusterService.state().projectState(projectId); + ProjectMetadata metadata = projectState.metadata(); + final Optional snapshotName = maybeTakeSnapshot(metadata, event.jobName(), client, clusterService, historyStore); // Would be cleaner if we could use Optional#ifPresentOrElse snapshotName.ifPresent( @@ -95,6 +120,48 @@ public void triggered(SchedulerEngine.Event event) { } } + /** + * Find SLM registered snapshots that are no longer running, and fetch their snapshot info. These snapshots should have been removed + * from the registered set by WriteJobStatus when they were completed. However, they were not removed likely due to the master being + * shutdown at the same time of a SLM run, causing WriteJobStatus to fail. These registered snapshots will be cleaned up in the next SLM + * run and their stats will be retroactively recorded in SLM cluster state based on their status. + */ + private static void findCompletedRegisteredSnapshotInfo( + final ProjectState projectState, + final String policyId, + final Client client, + final ActionListener> listener + ) { + var snapshotIds = findCompletedRegisteredSnapshotIds(projectState, policyId); + + if (snapshotIds.isEmpty() == false) { + var policyMetadata = getSnapPolicyMetadataById(projectState.metadata(), policyId); + if (policyMetadata.isPresent() == false) { + listener.onFailure(new IllegalStateException(format("snapshot lifecycle policy [%s] no longer exists", policyId))); + return; + } + SnapshotLifecyclePolicy policy = policyMetadata.get().getPolicy(); + + GetSnapshotsRequest request = new GetSnapshotsRequest( + TimeValue.MAX_VALUE, // do not time out internal request in case of slow master node + new String[] { policy.getRepository() }, + snapshotIds.toArray(new String[0]) + ); + request.ignoreUnavailable(true); + request.includeIndexNames(false); + + client.admin() + .cluster() + .execute( + TransportGetSnapshotsAction.TYPE, + request, + ActionListener.wrap(response -> listener.onResponse(response.getSnapshots()), listener::onFailure) + ); + } else { + listener.onResponse(Collections.emptyList()); + } + } + /** * For the given job id (a combination of policy id and version), issue a create snapshot * request. On a successful or failed create snapshot issuing the state is stored in the cluster @@ -111,6 +178,7 @@ public static Optional maybeTakeSnapshot( ProjectId projectId = projectMetadata.id(); Optional maybeMetadata = getSnapPolicyMetadata(projectMetadata, jobId); String snapshotName = maybeMetadata.map(policyMetadata -> { + String policyId = policyMetadata.getPolicy().getId(); // don't time out on this request to not produce failed SLM runs in case of a temporarily slow master node CreateSnapshotRequest request = policyMetadata.getPolicy().toRequest(TimeValue.MAX_VALUE); final SnapshotId snapshotId = new SnapshotId(request.snapshot(), request.uuid()); @@ -133,20 +201,48 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) { policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse) ); + final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo(); + assert snapInfo != null : "completed snapshot info is null"; // Check that there are no failed shards, since the request may not entirely // fail, but may still have failures (such as in the case of an aborted snapshot) if (snapInfo.failedShards() == 0) { long snapshotStartTime = snapInfo.startTime(); final long timestamp = Instant.now().toEpochMilli(); - submitUnbatchedTask( - clusterService, - "slm-record-success-" + policyMetadata.getPolicy().getId(), - WriteJobStatus.success(projectId, policyMetadata.getPolicy().getId(), snapshotId, snapshotStartTime, timestamp) - ); historyStore.putAsync( SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), request.snapshot()) ); + + // retrieve the current project state after snapshot is completed, since snapshotting can take a while + ProjectState currentProjectState = clusterService.state().projectState(projectId); + findCompletedRegisteredSnapshotInfo(currentProjectState, policyId, client, new ActionListener<>() { + @Override + public void onResponse(List snapshotInfo) { + submitUnbatchedTask( + clusterService, + "slm-record-success-" + policyId, + WriteJobStatus.success(projectId, policyId, snapshotId, snapshotStartTime, timestamp, snapshotInfo) + ); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> format("failed to retrieve stale registered snapshots for job [%s]", jobId), e); + // still record the successful snapshot + submitUnbatchedTask( + clusterService, + "slm-record-success-" + policyId, + WriteJobStatus.success( + projectId, + policyId, + snapshotId, + snapshotStartTime, + timestamp, + Collections.emptyList() + ) + ); + } + }); } else { int failures = snapInfo.failedShards(); int total = snapInfo.totalShards(); @@ -168,11 +264,45 @@ public void onFailure(Exception e) { e ); final long timestamp = Instant.now().toEpochMilli(); - submitUnbatchedTask( - clusterService, - "slm-record-failure-" + policyMetadata.getPolicy().getId(), - WriteJobStatus.failure(projectId, policyMetadata.getPolicy().getId(), snapshotId, timestamp, e) - ); + + // retrieve the current project state after snapshot is completed, since snapshotting can take a while + ProjectState currentProjectState = clusterService.state().projectState(projectId); + findCompletedRegisteredSnapshotInfo(currentProjectState, policyId, client, new ActionListener<>() { + @Override + public void onResponse(List snapshotInfo) { + submitUnbatchedTask( + clusterService, + "slm-record-failure-" + policyMetadata.getPolicy().getId(), + WriteJobStatus.failure( + projectId, + policyMetadata.getPolicy().getId(), + snapshotId, + timestamp, + snapshotInfo, + e + ) + ); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> format("failed to retrieve stale registered snapshots for job [%s]", jobId), e); + // still record the failed snapshot + submitUnbatchedTask( + clusterService, + "slm-record-failure-" + policyMetadata.getPolicy().getId(), + WriteJobStatus.failure( + projectId, + policyMetadata.getPolicy().getId(), + snapshotId, + timestamp, + Collections.emptyList(), + e + ) + ); + } + }); + final SnapshotHistoryItem failureRecord; try { failureRecord = SnapshotHistoryItem.creationFailureRecord( @@ -216,12 +346,18 @@ private static void submitUnbatchedTask( static Optional getSnapPolicyMetadata(final ProjectMetadata projectMetadata, final String jobId) { return Optional.ofNullable((SnapshotLifecycleMetadata) projectMetadata.custom(SnapshotLifecycleMetadata.TYPE)) .map(SnapshotLifecycleMetadata::getSnapshotConfigurations) - .flatMap( - configMap -> configMap.values() - .stream() - .filter(policyMeta -> jobId.equals(SnapshotLifecycleService.getJobId(policyMeta))) - .findFirst() - ); + .flatMap(configMap -> configMap.values().stream().filter(policyMeta -> jobId.equals(getJobId(policyMeta))).findFirst()); + } + + /** + * For the given policy id, return an optional policy metadata object, if one exists + */ + static Optional getSnapPolicyMetadataById( + final ProjectMetadata projectMetadata, + final String policyId + ) { + return Optional.ofNullable((SnapshotLifecycleMetadata) projectMetadata.custom(SnapshotLifecycleMetadata.TYPE)) + .map(metadata -> metadata.getSnapshotConfigurations().get(policyId)); } public static String exceptionToString(Exception ex) { @@ -253,6 +389,10 @@ static SnapshotInvocationRecord buildFailedSnapshotRecord(SnapshotId snapshot) { ); } + static boolean isSnapshotSuccessful(SnapshotInfo snapshotInfo) { + return snapshotInfo.state() != null && snapshotInfo.state().completed() && snapshotInfo.failedShards() == 0; + } + /** * A cluster state update task to write the result of a snapshot job to the cluster metadata for the associated policy. */ @@ -264,6 +404,8 @@ static class WriteJobStatus extends ClusterStateUpdateTask { private final long snapshotStartTime; private final long snapshotFinishTime; private final Optional exception; + // preloaded snapshot info for registered snapshots that are no longer running + private final List registeredSnapshotInfo; private WriteJobStatus( ProjectId projectId, @@ -271,6 +413,7 @@ private WriteJobStatus( SnapshotId snapshotId, long snapshotStartTime, long snapshotFinishTime, + List registeredSnapshotInfo, Optional exception ) { this.projectId = projectId; @@ -279,6 +422,7 @@ private WriteJobStatus( this.exception = exception; this.snapshotStartTime = snapshotStartTime; this.snapshotFinishTime = snapshotFinishTime; + this.registeredSnapshotInfo = registeredSnapshotInfo; } static WriteJobStatus success( @@ -286,13 +430,37 @@ static WriteJobStatus success( String policyId, SnapshotId snapshotId, long snapshotStartTime, - long snapshotFinishTime + long snapshotFinishTime, + List registeredSnapshotInfo ) { - return new WriteJobStatus(projectId, policyId, snapshotId, snapshotStartTime, snapshotFinishTime, Optional.empty()); + return new WriteJobStatus( + projectId, + policyId, + snapshotId, + snapshotStartTime, + snapshotFinishTime, + registeredSnapshotInfo, + Optional.empty() + ); } - static WriteJobStatus failure(ProjectId projectId, String policyId, SnapshotId snapshotId, long timestamp, Exception exception) { - return new WriteJobStatus(projectId, policyId, snapshotId, timestamp, timestamp, Optional.of(exception)); + static WriteJobStatus failure( + ProjectId projectId, + String policyId, + SnapshotId snapshotId, + long timestamp, + List registeredSnapshotInfo, + Exception exception + ) { + return new WriteJobStatus( + projectId, + policyId, + snapshotId, + timestamp, + timestamp, + registeredSnapshotInfo, + Optional.of(exception) + ); } @Override @@ -313,6 +481,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { return currentState; } + Map snapshotInfoById = registeredSnapshotInfo.stream() + .collect(Collectors.toMap(SnapshotInfo::snapshotId, Function.identity())); + final SnapshotLifecyclePolicyMetadata.Builder newPolicyMetadata = SnapshotLifecyclePolicyMetadata.builder(policyMetadata); SnapshotLifecycleStats newStats = snapMeta.getStats(); @@ -326,22 +497,58 @@ public ClusterState execute(ClusterState currentState) throws Exception { final Set runningSnapshots = currentlyRunningSnapshots(currentState); final List newRegistered = new ArrayList<>(); - for (PolicySnapshot snapshot : registeredSnapshots.getSnapshots()) { - if (snapshot.getSnapshotId().equals(snapshotId) == false) { - if (snapshot.getPolicy().equals(policyName)) { - if (runningSnapshots.contains(snapshot.getSnapshotId())) { - // Snapshot is for this policy and is still running so keep it in registered set - newRegistered.add(snapshot); + + // go through the registered set to find stale snapshots and calculate stats + for (PolicySnapshot registeredSnapshot : registeredSnapshots.getSnapshots()) { + SnapshotId registeredSnapshotId = registeredSnapshot.getSnapshotId(); + if (registeredSnapshotId.equals(snapshotId)) { + // skip the snapshot just completed, it will be handled later + continue; + } + if (snapLifecycles.containsKey(registeredSnapshot.getPolicy()) == false) { + // the SLM policy no longer exists, just remove the snapshot from registered set, no need to record stats + continue; + } + if (registeredSnapshot.getPolicy().equals(policyName) == false || runningSnapshots.contains(registeredSnapshotId)) { + // the snapshot is for another policy, leave it to that policy to clean up, or the snapshot is still running + newRegistered.add(registeredSnapshot); + } else { + // the snapshot was completed and should be removed from registered snapshots, update state accordingly + SnapshotInfo snapshotInfo = snapshotInfoById.get(registeredSnapshotId); + if (snapshotInfo != null) { + if (isSnapshotSuccessful(snapshotInfo)) { + newStats = newStats.withTakenIncremented(policyName); + newPolicyMetadata.setLastSuccess( + new SnapshotInvocationRecord( + snapshotInfo.snapshotId().getName(), + snapshotInfo.startTime(), + snapshotInfo.endTime(), + null + ) + ); + newPolicyMetadata.setInvocationsSinceLastSuccess(0L); } else { - // Snapshot is for this policy but is not running so infer failure, update stats accordingly, - // and remove from registered set newStats = newStats.withFailedIncremented(policyName); - newPolicyMetadata.incrementInvocationsSinceLastSuccess() - .setLastFailure(buildFailedSnapshotRecord(snapshot.getSnapshotId())); + newPolicyMetadata.setLastFailure( + new SnapshotInvocationRecord( + snapshotInfo.snapshotId().getName(), + snapshotInfo.startTime(), + snapshotInfo.endTime(), + String.format( + Locale.ROOT, + "found failed registered snapshot [%s], the master node was likely shutdown during SLM execution", + snapshotInfo.snapshotId().getName() + ) + ) + ); + newPolicyMetadata.incrementInvocationsSinceLastSuccess(); } - } else if (snapLifecycles.containsKey(snapshot.getPolicy())) { - // Snapshot is for another policy so keep in the registered set and that policy deal with it - newRegistered.add(snapshot); + } else { + // either the snapshot no longer exist in the repo or its info failed to be retrieved, assume failure to clean it up + // so it is not stuck in the registered set forever + newPolicyMetadata.incrementInvocationsSinceLastSuccess() + .setLastFailure(buildFailedSnapshotRecord(registeredSnapshotId)); + newStats = newStats.withFailedIncremented(policyName); } } } diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java index 93cfbeedb013e..c75e5eaf14cab 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java @@ -346,8 +346,22 @@ public void testDeletedPoliciesHaveRegisteredRemoved() throws Exception { final SnapshotId snapForDeletedPolicy = randSnapshotId(); SnapshotLifecycleTask.WriteJobStatus writeJobStatus = randomBoolean() - ? SnapshotLifecycleTask.WriteJobStatus.success(projectId, policyId, initiatingSnap, randomLong(), randomLong()) - : SnapshotLifecycleTask.WriteJobStatus.failure(projectId, policyId, initiatingSnap, randomLong(), new RuntimeException()); + ? SnapshotLifecycleTask.WriteJobStatus.success( + projectId, + policyId, + initiatingSnap, + randomLong(), + randomLong(), + Collections.emptyList() + ) + : SnapshotLifecycleTask.WriteJobStatus.failure( + projectId, + policyId, + initiatingSnap, + randomLong(), + Collections.emptyList(), + new RuntimeException() + ); // deletedPolicy is no longer defined var definedSlmPolicies = List.of(policyId); @@ -373,8 +387,22 @@ public void testOtherDefinedPoliciesUneffected() throws Exception { final SnapshotId otherSnapNotRunning = randSnapshotId(); SnapshotLifecycleTask.WriteJobStatus writeJobStatus = randomBoolean() - ? SnapshotLifecycleTask.WriteJobStatus.success(projectId, policyId, initiatingSnap, randomLong(), randomLong()) - : SnapshotLifecycleTask.WriteJobStatus.failure(projectId, policyId, initiatingSnap, randomLong(), new RuntimeException()); + ? SnapshotLifecycleTask.WriteJobStatus.success( + projectId, + policyId, + initiatingSnap, + randomLong(), + randomLong(), + Collections.emptyList() + ) + : SnapshotLifecycleTask.WriteJobStatus.failure( + projectId, + policyId, + initiatingSnap, + randomLong(), + Collections.emptyList(), + new RuntimeException() + ); var definedSlmPolicies = List.of(policyId, otherPolicy); var registeredSnapshots = Map.of(policyId, List.of(initiatingSnap), otherPolicy, List.of(otherSnapRunning, otherSnapNotRunning)); @@ -395,8 +423,22 @@ public void testInitiatingSnapRemovedButStillRunningRemains() throws Exception { final SnapshotId initiatingSnap = randSnapshotId(); SnapshotLifecycleTask.WriteJobStatus writeJobStatus = randomBoolean() - ? SnapshotLifecycleTask.WriteJobStatus.success(projectId, policyId, initiatingSnap, randomLong(), randomLong()) - : SnapshotLifecycleTask.WriteJobStatus.failure(projectId, policyId, initiatingSnap, randomLong(), new RuntimeException()); + ? SnapshotLifecycleTask.WriteJobStatus.success( + projectId, + policyId, + initiatingSnap, + randomLong(), + randomLong(), + Collections.emptyList() + ) + : SnapshotLifecycleTask.WriteJobStatus.failure( + projectId, + policyId, + initiatingSnap, + randomLong(), + Collections.emptyList(), + new RuntimeException() + ); final SnapshotId stillRunning = randSnapshotId(); @@ -414,15 +456,21 @@ public void testInitiatingSnapRemovedButStillRunningRemains() throws Exception { assertEquals(List.of(stillRunning), newRegisteredPolicySnapshots.getSnapshotsByPolicy(policyId)); } - public void testInferFailureInitiatedBySuccess() throws Exception { + public void testCleanUpRegisteredInitiatedBySuccess() throws Exception { final String policyId = randomAlphaOfLength(10); final SnapshotId initiatingSnapshot = randSnapshotId(); - final SnapshotId previousFailedSnapshot = randSnapshotId(); + final SnapshotId inferredFailureSnapshot = randSnapshotId(); // currently running snapshots final SnapshotId stillRunning = randSnapshotId(); + final SnapshotInfo snapshotInfoSuccess = randomSnapshotInfoSuccess(projectId); + final SnapshotInfo snapshotInfoFailure = randomSnapshotInfoFailure(projectId); + var definedSlmPolicies = List.of(policyId); - var registeredSnapshots = Map.of(policyId, List.of(stillRunning, previousFailedSnapshot)); + var registeredSnapshots = Map.of( + policyId, + List.of(stillRunning, inferredFailureSnapshot, snapshotInfoSuccess.snapshotId(), snapshotInfoFailure.snapshotId()) + ); var inProgress = Map.of(policyId, List.of(stillRunning)); ClusterState clusterState = buildClusterState(projectId, definedSlmPolicies, registeredSnapshots, inProgress); @@ -431,24 +479,31 @@ public void testInferFailureInitiatedBySuccess() throws Exception { policyId, initiatingSnapshot, randomLong(), - randomLong() + randomLong(), + List.of(snapshotInfoSuccess, snapshotInfoFailure) ); ClusterState newClusterState = writeJobTask.execute(clusterState); - // previous failure is now recorded in stats and metadata + // snapshotInfoSuccess, initiatingSnapshot + int expectedSuccessCount = 2; + // inferredFailureSnapshot, snapshotInfoFailure + int expectedFailureCount = 2; + // the last snapshot (initiatingSnapshot) was successful + int expectedInvocationsSinceLastSuccess = 0; + // registered snapshots state is now recorded in stats and metadata SnapshotLifecycleMetadata newSlmMetadata = newClusterState.metadata().getProject(projectId).custom(SnapshotLifecycleMetadata.TYPE); SnapshotLifecycleStats newStats = newSlmMetadata.getStats(); SnapshotLifecycleStats.SnapshotPolicyStats snapshotPolicyStats = newStats.getMetrics().get(policyId); - assertEquals(1, snapshotPolicyStats.getSnapshotFailedCount()); - assertEquals(1, snapshotPolicyStats.getSnapshotTakenCount()); + assertEquals(expectedFailureCount, snapshotPolicyStats.getSnapshotFailedCount()); + assertEquals(expectedSuccessCount, snapshotPolicyStats.getSnapshotTakenCount()); SnapshotLifecyclePolicyMetadata newSlmPolicyMetadata = newSlmMetadata.getSnapshotConfigurations().get(policyId); - assertEquals(previousFailedSnapshot.getName(), newSlmPolicyMetadata.getLastFailure().getSnapshotName()); + assertEquals(snapshotInfoFailure.snapshotId().getName(), newSlmPolicyMetadata.getLastFailure().getSnapshotName()); assertEquals(initiatingSnapshot.getName(), newSlmPolicyMetadata.getLastSuccess().getSnapshotName()); - assertEquals(0, newSlmPolicyMetadata.getInvocationsSinceLastSuccess()); + assertEquals(expectedInvocationsSinceLastSuccess, newSlmPolicyMetadata.getInvocationsSinceLastSuccess()); - // failed snapshot no longer in registeredSnapshot set + // completed snapshot no longer in registeredSnapshot set RegisteredPolicySnapshots newRegisteredPolicySnapshots = newClusterState.metadata() .getProject(projectId) .custom(RegisteredPolicySnapshots.TYPE); @@ -456,14 +511,26 @@ public void testInferFailureInitiatedBySuccess() throws Exception { assertEquals(List.of(stillRunning), newRegisteredSnapIds); } - public void testInferFailureInitiatedByFailure() throws Exception { + public void testCleanUpRegisteredInitiatedByFailure() throws Exception { final String policyId = randomAlphaOfLength(10); final SnapshotId initiatingSnapshot = randSnapshotId(); - final SnapshotId previousFailedSnapshot = randSnapshotId(); + final SnapshotId inferredFailureSnapshot = randSnapshotId(); final SnapshotId stillRunning = randSnapshotId(); + final SnapshotInfo snapshotInfoSuccess = randomSnapshotInfoSuccess(projectId); + final SnapshotInfo snapshotInfoFailure1 = randomSnapshotInfoFailure(projectId); + final SnapshotInfo snapshotInfoFailure2 = randomSnapshotInfoFailure(projectId); var definedSlmPolicies = List.of(policyId); - var registeredSnapshots = Map.of(policyId, List.of(stillRunning, previousFailedSnapshot)); + var registeredSnapshots = Map.of( + policyId, + List.of( + stillRunning, + inferredFailureSnapshot, + snapshotInfoSuccess.snapshotId(), + snapshotInfoFailure1.snapshotId(), + snapshotInfoFailure2.snapshotId() + ) + ); var inProgress = Map.of(policyId, List.of(stillRunning)); ClusterState clusterState = buildClusterState(projectId, definedSlmPolicies, registeredSnapshots, inProgress); @@ -472,24 +539,31 @@ public void testInferFailureInitiatedByFailure() throws Exception { policyId, initiatingSnapshot, randomLong(), + List.of(snapshotInfoSuccess, snapshotInfoFailure1, snapshotInfoFailure2), new RuntimeException() ); ClusterState newClusterState = writeJobTask.execute(clusterState); - // previous failure is now recorded in stats and metadata + // snapshotInfoSuccess + int expectedSuccessCount = 1; + // inferredFailureSnapshot, snapshotInfoFailure1, snapshotInfoFailure2, initiatingSnapshot + int expectedFailureCount = 4; + // snapshotInfoFailure1, snapshotInfoFailure2, initiatingSnapshot + int expectedInvocationsSinceLastSuccess = 3; + // registered snapshots state is now recorded in stats and metadata SnapshotLifecycleMetadata newSlmMetadata = newClusterState.metadata().getProject(projectId).custom(SnapshotLifecycleMetadata.TYPE); SnapshotLifecycleStats newStats = newSlmMetadata.getStats(); SnapshotLifecycleStats.SnapshotPolicyStats snapshotPolicyStats = newStats.getMetrics().get(policyId); - assertEquals(2, snapshotPolicyStats.getSnapshotFailedCount()); - assertEquals(0, snapshotPolicyStats.getSnapshotTakenCount()); + assertEquals(expectedFailureCount, snapshotPolicyStats.getSnapshotFailedCount()); + assertEquals(expectedSuccessCount, snapshotPolicyStats.getSnapshotTakenCount()); SnapshotLifecyclePolicyMetadata newSlmPolicyMetadata = newSlmMetadata.getSnapshotConfigurations().get(policyId); assertEquals(initiatingSnapshot.getName(), newSlmPolicyMetadata.getLastFailure().getSnapshotName()); - assertNull(newSlmPolicyMetadata.getLastSuccess()); - assertEquals(2, newSlmPolicyMetadata.getInvocationsSinceLastSuccess()); + assertEquals(snapshotInfoSuccess.snapshotId().getName(), newSlmPolicyMetadata.getLastSuccess().getSnapshotName()); + assertEquals(expectedInvocationsSinceLastSuccess, newSlmPolicyMetadata.getInvocationsSinceLastSuccess()); - // failed snapshot no longer in registeredSnapshot set + // completed snapshot no longer in registeredSnapshot set RegisteredPolicySnapshots newRegisteredPolicySnapshots = newClusterState.metadata() .getProject(projectId) .custom(RegisteredPolicySnapshots.TYPE); @@ -659,4 +733,42 @@ public void putAsync(SnapshotHistoryItem item) { verifier.accept(item); } } + + private static SnapshotInfo randomSnapshotInfoSuccess(ProjectId projectId) { + long startTime = randomNonNegativeLong(); + long endTime = randomLongBetween(startTime, Long.MAX_VALUE); + return new SnapshotInfo( + new Snapshot(projectId, "repo", randSnapshotId()), + List.of("index1", "index2"), + List.of(), + List.of(), + null, + endTime, + 2, + List.of(), + randomBoolean(), + Map.of(), + startTime, + Map.of() + ); + } + + private static SnapshotInfo randomSnapshotInfoFailure(ProjectId projectId) { + long startTime = randomNonNegativeLong(); + long endTime = randomLongBetween(startTime, Long.MAX_VALUE); + return new SnapshotInfo( + new Snapshot(projectId, "repo", randSnapshotId()), + List.of("index1", "index2"), + List.of(), + List.of(), + "failed snapshot", + endTime, + 2, + List.of(new SnapshotShardFailure("nodeId", new ShardId("index", "uuid", 0), "forced failure")), + randomBoolean(), + Map.of(), + startTime, + Map.of() + ); + } }