From b3c35cefd1d2ca6895b08408dd2d4dfd73185fc3 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 10 Oct 2025 14:10:04 +1100 Subject: [PATCH 01/12] Fix checkstyle --- .../main/java/org/elasticsearch/index/shard/IndexingStats.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java index 39b664a3bd37e..0f60518e90256 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java @@ -10,7 +10,6 @@ package org.elasticsearch.index.shard; import org.elasticsearch.TransportVersion; -import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; From 42144728e4114211ed10dcb814f6bb17f6edb0b2 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 10 Oct 2025 14:10:32 +1100 Subject: [PATCH 02/12] Generate snapshot metrics from last applied state --- .../snapshots/SnapshotsService.java | 14 +- .../snapshots/SnapshotsServiceTests.java | 130 ++++++++++++++++++ 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f28d4948f657a..8f79f18ad94f5 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -203,6 +203,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement ); private volatile int maxConcurrentOperations; + private volatile ClusterState lastAppliedClusterState; public SnapshotsService( Settings settings, @@ -677,6 +678,7 @@ private Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadat @Override public void applyClusterState(ClusterChangedEvent event) { + lastAppliedClusterState = event.state(); try { if (event.localNodeMaster()) { // We don't remove old master when master flips anymore. So, we need to check for change in master @@ -3358,7 +3360,11 @@ private SnapshotsInProgress createSnapshot( } private Collection getShardsByState() { - final ClusterState currentState = clusterService.state(); + final ClusterState currentState = lastAppliedClusterState; + // If we haven't seen a state, we can't report metrics + if (currentState == null) { + return List.of(); + } // Only the master should report on shards-by-state if (currentState.nodes().isLocalNodeElectedMaster() == false) { return List.of(); @@ -3367,7 +3373,11 @@ private Collection getShardsByState() { } private Collection getSnapshotsByState() { - final ClusterState currentState = clusterService.state(); + final ClusterState currentState = lastAppliedClusterState; + // If we haven't seen a state, we can't report metrics + if (currentState == null) { + return List.of(); + } // Only the master should report on snapshots-by-state if (currentState.nodes().isLocalNodeElectedMaster() == false) { return List.of(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 7181cda1b5e0b..83fe601f44ac0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -10,13 +10,21 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; @@ -25,30 +33,46 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.repositories.SnapshotMetrics; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpNodeClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.transport.TransportService; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.singleton; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SnapshotsServiceTests extends ESTestCase { @@ -467,6 +491,112 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception { ); } + public void testMetricsAreCalculatedFromLastAppliedClusterState() { + final ClusterService clusterService = mock(ClusterService.class); + final Settings settings = Settings.EMPTY; + final ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + final RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry(); + final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(recordingMeterRegistry); + try ( + TestThreadPool testThreadPool = new TestThreadPool("test"); + SnapshotsService snapshotsService = new SnapshotsService( + settings, + clusterService, + (reason, priority, listener) -> listener.onResponse(null), + new IndexNameExpressionResolver( + new ThreadContext(settings), + EmptySystemIndices.INSTANCE, + TestProjectResolvers.DEFAULT_PROJECT_ONLY + ), + new RepositoriesService( + settings, + clusterService, + Map.of(), + Map.of(), + testThreadPool, + new NoOpNodeClient(testThreadPool), + List.of(), + snapshotMetrics + ), + mock(TransportService.class), + EmptySystemIndices.INSTANCE, + randomBoolean(), + snapshotMetrics + ) + ) { + // No metrics should be recorded before seeing a cluster state + recordingMeterRegistry.getRecorder().collect(); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), + empty() + ); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), + empty() + ); + + // Simulate a cluster state being applied + final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(); + snapshotsService.applyClusterState(new ClusterChangedEvent("test", withSnapshotsInProgress, withSnapshotsInProgress)); + + // Now we should publish some metrics + recordingMeterRegistry.getRecorder().collect(); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), + not(empty()) + ); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), + not(empty()) + ); + } + } + + private ClusterState createClusterStateWithSnapshotsInProgress() { + final var indexName = randomIdentifier(); + final var repositoryName = randomIdentifier(); + final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); + final IndexMetadata index = state.projectState(ProjectId.DEFAULT).metadata().index(indexName); + return ClusterState.builder(state) + .nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(state.nodes().getLocalNodeId())) + .putProjectMetadata( + ProjectMetadata.builder(state.getMetadata().getProject(ProjectId.DEFAULT)) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(List.of(new RepositoryMetadata(repositoryName, "fs", Settings.EMPTY))) + ) + ) + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName))) + .incrementVersion() + .build(); + } + + private SnapshotsInProgress.Entry createEntry(IndexMetadata indexMetadata, String repositoryName) { + return SnapshotsInProgress.Entry.snapshot( + new Snapshot(ProjectId.DEFAULT, repositoryName, new SnapshotId("", "")), + false, + randomBoolean(), + SnapshotsInProgress.State.STARTED, + Map.of(indexMetadata.getIndex().getName(), new IndexId(indexMetadata.getIndex().getName(), randomIdentifier())), + List.of(), + Collections.emptyList(), + 0, + 1, + IntStream.range(0, indexMetadata.getNumberOfShards()) + .mapToObj(i -> new ShardId(indexMetadata.getIndex(), i)) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + shardId -> new SnapshotsInProgress.ShardSnapshotStatus(randomIdentifier(), ShardGeneration.newGeneration()) + ) + ), + null, + null, + null + ); + } + private SnapshotsInProgress.ShardSnapshotStatus successShardSnapshotStatus( String nodeId, ShardId shardId, From 4d7ab3d647c815a493c0056c917c92d24b251fda Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 10 Oct 2025 16:09:35 +1100 Subject: [PATCH 03/12] Update docs/changelog/136350.yaml --- docs/changelog/136350.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/136350.yaml diff --git a/docs/changelog/136350.yaml b/docs/changelog/136350.yaml new file mode 100644 index 0000000000000..12c56dac37f94 --- /dev/null +++ b/docs/changelog/136350.yaml @@ -0,0 +1,5 @@ +pr: 136350 +summary: Generate snapshot metrics from last applied state +area: Snapshot/Restore +type: bug +issues: [] From f1c7b95f2a05c49387d0eb8b327beba1258c9a3b Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 10 Oct 2025 17:18:32 +1100 Subject: [PATCH 04/12] Split out metrics calculation --- .../elasticsearch/node/NodeConstruction.java | 2 + .../snapshots/SnapshotMetricsService.java | 139 ++++++++++++++++++ .../snapshots/SnapshotsService.java | 102 ------------- .../SnapshotMetricsServiceTests.java | 125 ++++++++++++++++ .../snapshots/SnapshotsServiceTests.java | 130 ---------------- 5 files changed, 266 insertions(+), 232 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java create mode 100644 server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 46216dd94fcba..76dde2447afe7 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -215,6 +215,7 @@ import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotMetricsService; import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.snapshots.SnapshotsService; @@ -1187,6 +1188,7 @@ public Map queryFields() { transportService, indicesService ); + clusterService.addListener(new SnapshotMetricsService(snapshotMetrics, clusterService)); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService)); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction()); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java new file mode 100644 index 0000000000000..0dc5cf3d7a78a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.repositories.SnapshotMetrics; +import org.elasticsearch.telemetry.metric.LongWithAttributes; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces + * metrics on the master node, and only after it's seen a cluster state applied. + */ +public class SnapshotMetricsService implements ClusterStateListener { + + private final ClusterService clusterService; + private volatile boolean shouldReturnSnapshotMetrics; + private CachedSnapshotStateMetrics cachedSnapshotStateMetrics; + + public SnapshotMetricsService(SnapshotMetrics snapshotMetrics, ClusterService clusterService) { + this.clusterService = clusterService; + snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState); + snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState clusterState = event.state(); + // Only return metrics when the state is recovered and we are the master + shouldReturnSnapshotMetrics = clusterState.nodes().isLocalNodeElectedMaster() + && clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false; + } + + private Collection getShardsByState() { + if (shouldReturnSnapshotMetrics == false) { + return List.of(); + } + return recalculateIfStale(clusterService.state()).shardStateMetrics(); + } + + private Collection getSnapshotsByState() { + if (shouldReturnSnapshotMetrics == false) { + return List.of(); + } + return recalculateIfStale(clusterService.state()).snapshotStateMetrics(); + } + + private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) { + if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) { + cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState); + } + return cachedSnapshotStateMetrics; + } + + private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) { + final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState); + final List snapshotStateMetrics = new ArrayList<>(); + final List shardStateMetrics = new ArrayList<>(); + + currentState.metadata().projects().forEach((projectId, project) -> { + final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project); + if (repositoriesMetadata != null) { + for (RepositoryMetadata repository : repositoriesMetadata.repositories()) { + final Tuple, Map> stateSummaries = + snapshotsInProgress.shardStateSummaryForRepository(projectId, repository.name()); + final Map attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository); + stateSummaries.v1() + .forEach( + (snapshotState, count) -> snapshotStateMetrics.add( + new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name())) + ) + ); + stateSummaries.v2() + .forEach( + (shardState, count) -> shardStateMetrics.add( + new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name())) + ) + ); + } + } + }); + return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics); + } + + /** + * A cached copy of the snapshot and shard state metrics + */ + private record CachedSnapshotStateMetrics( + String clusterStateId, + int snapshotsInProgressIdentityHashcode, + Collection snapshotStateMetrics, + Collection shardStateMetrics + ) { + CachedSnapshotStateMetrics( + ClusterState sourceState, + Collection snapshotStateMetrics, + Collection shardStateMetrics + ) { + this( + sourceState.stateUUID(), + System.identityHashCode(SnapshotsInProgress.get(sourceState)), + snapshotStateMetrics, + shardStateMetrics + ); + } + + /** + * Are these metrics stale? + * + * @param currentClusterState The current cluster state + * @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise + */ + public boolean isStale(ClusterState currentClusterState) { + return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false + && System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8f79f18ad94f5..45aef41ce4466 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -90,7 +90,6 @@ import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotMetrics; -import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -187,8 +186,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler; - private CachedSnapshotStateMetrics cachedSnapshotStateMetrics; - /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of @@ -222,8 +219,6 @@ public SnapshotsService( this.repositoriesService = repositoriesService; this.threadPool = transportService.getThreadPool(); this.snapshotMetrics = snapshotMetrics; - snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState); - snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState); if (DiscoveryNode.isMasterNode(settings)) { // addLowPriorityApplier to make sure that Repository will be created before snapshot @@ -3359,69 +3354,6 @@ private SnapshotsInProgress createSnapshot( } } - private Collection getShardsByState() { - final ClusterState currentState = lastAppliedClusterState; - // If we haven't seen a state, we can't report metrics - if (currentState == null) { - return List.of(); - } - // Only the master should report on shards-by-state - if (currentState.nodes().isLocalNodeElectedMaster() == false) { - return List.of(); - } - return recalculateIfStale(currentState).shardStateMetrics(); - } - - private Collection getSnapshotsByState() { - final ClusterState currentState = lastAppliedClusterState; - // If we haven't seen a state, we can't report metrics - if (currentState == null) { - return List.of(); - } - // Only the master should report on snapshots-by-state - if (currentState.nodes().isLocalNodeElectedMaster() == false) { - return List.of(); - } - return recalculateIfStale(currentState).snapshotStateMetrics(); - } - - private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) { - if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) { - cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState); - } - return cachedSnapshotStateMetrics; - } - - private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) { - final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState); - final List snapshotStateMetrics = new ArrayList<>(); - final List shardStateMetrics = new ArrayList<>(); - - currentState.metadata().projects().forEach((projectId, project) -> { - final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project); - if (repositoriesMetadata != null) { - for (RepositoryMetadata repository : repositoriesMetadata.repositories()) { - final Tuple, Map> stateSummaries = snapshotsInProgress - .shardStateSummaryForRepository(projectId, repository.name()); - final Map attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository); - stateSummaries.v1() - .forEach( - (snapshotState, count) -> snapshotStateMetrics.add( - new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name())) - ) - ); - stateSummaries.v2() - .forEach( - (shardState, count) -> shardStateMetrics.add( - new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name())) - ) - ); - } - } - }); - return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics); - } - private record UpdateNodeIdsForRemovalTask() implements ClusterStateTaskListener { @Override public void onFailure(Exception e) { @@ -3449,38 +3381,4 @@ static ClusterState executeBatch( } private final MasterServiceTaskQueue updateNodeIdsToRemoveQueue; - - /** - * A cached copy of the snapshot and shard state metrics - */ - private record CachedSnapshotStateMetrics( - String clusterStateId, - int snapshotsInProgressIdentityHashcode, - Collection snapshotStateMetrics, - Collection shardStateMetrics - ) { - CachedSnapshotStateMetrics( - ClusterState sourceState, - Collection snapshotStateMetrics, - Collection shardStateMetrics - ) { - this( - sourceState.stateUUID(), - System.identityHashCode(SnapshotsInProgress.get(sourceState)), - snapshotStateMetrics, - shardStateMetrics - ); - } - - /** - * Are these metrics stale? - * - * @param currentClusterState The current cluster state - * @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise - */ - public boolean isStale(ClusterState currentClusterState) { - return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false - && System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode); - } - } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java new file mode 100644 index 0000000000000..35885412e8d81 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.SnapshotMetrics; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.RecordingMeterRegistry; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class SnapshotMetricsServiceTests extends ESTestCase { + + public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { + final ClusterService clusterService = mock(ClusterService.class); + final RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry(); + final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(recordingMeterRegistry); + final SnapshotMetricsService snapshotsService = new SnapshotMetricsService(snapshotMetrics, clusterService); + + // No metrics should be recorded before seeing a cluster state + recordingMeterRegistry.getRecorder().collect(); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), + empty() + ); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), + empty() + ); + verifyNoInteractions(clusterService); + + // Simulate a cluster state being applied + final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(); + when(clusterService.state()).thenReturn(withSnapshotsInProgress); + snapshotsService.clusterChanged(new ClusterChangedEvent("test", withSnapshotsInProgress, withSnapshotsInProgress)); + + // This time we should publish some metrics + recordingMeterRegistry.getRecorder().collect(); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), + not(empty()) + ); + assertThat( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), + not(empty()) + ); + } + + private ClusterState createClusterStateWithSnapshotsInProgress() { + final var indexName = randomIdentifier(); + final var repositoryName = randomIdentifier(); + final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); + final IndexMetadata index = state.projectState(ProjectId.DEFAULT).metadata().index(indexName); + return ClusterState.builder(state) + .nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(state.nodes().getLocalNodeId())) + .putProjectMetadata( + ProjectMetadata.builder(state.getMetadata().getProject(ProjectId.DEFAULT)) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(List.of(new RepositoryMetadata(repositoryName, "fs", Settings.EMPTY))) + ) + ) + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName))) + .incrementVersion() + .build(); + } + + private SnapshotsInProgress.Entry createEntry(IndexMetadata indexMetadata, String repositoryName) { + return SnapshotsInProgress.Entry.snapshot( + new Snapshot(ProjectId.DEFAULT, repositoryName, new SnapshotId("", "")), + false, + randomBoolean(), + SnapshotsInProgress.State.STARTED, + Map.of(indexMetadata.getIndex().getName(), new IndexId(indexMetadata.getIndex().getName(), randomIdentifier())), + List.of(), + Collections.emptyList(), + 0, + 1, + IntStream.range(0, indexMetadata.getNumberOfShards()) + .mapToObj(i -> new ShardId(indexMetadata.getIndex(), i)) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + shardId -> new SnapshotsInProgress.ShardSnapshotStatus(randomIdentifier(), ShardGeneration.newGeneration()) + ) + ), + null, + null, + null + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 83fe601f44ac0..7181cda1b5e0b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -10,21 +10,13 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.metadata.RepositoriesMetadata; -import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; @@ -33,46 +25,30 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardSnapshotResult; -import org.elasticsearch.repositories.SnapshotMetrics; -import org.elasticsearch.telemetry.InstrumentType; -import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpNodeClient; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.transport.TransportService; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static java.util.Collections.singleton; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class SnapshotsServiceTests extends ESTestCase { @@ -491,112 +467,6 @@ public void testPauseForNodeRemovalWithQueuedShards() throws Exception { ); } - public void testMetricsAreCalculatedFromLastAppliedClusterState() { - final ClusterService clusterService = mock(ClusterService.class); - final Settings settings = Settings.EMPTY; - final ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - final RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry(); - final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(recordingMeterRegistry); - try ( - TestThreadPool testThreadPool = new TestThreadPool("test"); - SnapshotsService snapshotsService = new SnapshotsService( - settings, - clusterService, - (reason, priority, listener) -> listener.onResponse(null), - new IndexNameExpressionResolver( - new ThreadContext(settings), - EmptySystemIndices.INSTANCE, - TestProjectResolvers.DEFAULT_PROJECT_ONLY - ), - new RepositoriesService( - settings, - clusterService, - Map.of(), - Map.of(), - testThreadPool, - new NoOpNodeClient(testThreadPool), - List.of(), - snapshotMetrics - ), - mock(TransportService.class), - EmptySystemIndices.INSTANCE, - randomBoolean(), - snapshotMetrics - ) - ) { - // No metrics should be recorded before seeing a cluster state - recordingMeterRegistry.getRecorder().collect(); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), - empty() - ); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), - empty() - ); - - // Simulate a cluster state being applied - final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(); - snapshotsService.applyClusterState(new ClusterChangedEvent("test", withSnapshotsInProgress, withSnapshotsInProgress)); - - // Now we should publish some metrics - recordingMeterRegistry.getRecorder().collect(); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), - not(empty()) - ); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), - not(empty()) - ); - } - } - - private ClusterState createClusterStateWithSnapshotsInProgress() { - final var indexName = randomIdentifier(); - final var repositoryName = randomIdentifier(); - final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); - final IndexMetadata index = state.projectState(ProjectId.DEFAULT).metadata().index(indexName); - return ClusterState.builder(state) - .nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(state.nodes().getLocalNodeId())) - .putProjectMetadata( - ProjectMetadata.builder(state.getMetadata().getProject(ProjectId.DEFAULT)) - .putCustom( - RepositoriesMetadata.TYPE, - new RepositoriesMetadata(List.of(new RepositoryMetadata(repositoryName, "fs", Settings.EMPTY))) - ) - ) - .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName))) - .incrementVersion() - .build(); - } - - private SnapshotsInProgress.Entry createEntry(IndexMetadata indexMetadata, String repositoryName) { - return SnapshotsInProgress.Entry.snapshot( - new Snapshot(ProjectId.DEFAULT, repositoryName, new SnapshotId("", "")), - false, - randomBoolean(), - SnapshotsInProgress.State.STARTED, - Map.of(indexMetadata.getIndex().getName(), new IndexId(indexMetadata.getIndex().getName(), randomIdentifier())), - List.of(), - Collections.emptyList(), - 0, - 1, - IntStream.range(0, indexMetadata.getNumberOfShards()) - .mapToObj(i -> new ShardId(indexMetadata.getIndex(), i)) - .collect( - Collectors.toUnmodifiableMap( - Function.identity(), - shardId -> new SnapshotsInProgress.ShardSnapshotStatus(randomIdentifier(), ShardGeneration.newGeneration()) - ) - ), - null, - null, - null - ); - } - private SnapshotsInProgress.ShardSnapshotStatus successShardSnapshotStatus( String nodeId, ShardId shardId, From 8c72da52436219e206baffb482180f6ea48f6ed0 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 10 Oct 2025 17:22:03 +1100 Subject: [PATCH 05/12] Fix changelog --- docs/changelog/136350.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/136350.yaml b/docs/changelog/136350.yaml index 12c56dac37f94..994a9c6fda0e0 100644 --- a/docs/changelog/136350.yaml +++ b/docs/changelog/136350.yaml @@ -1,5 +1,5 @@ pr: 136350 -summary: Generate snapshot metrics from last applied state +summary: Prevent NPE when generating snapshot metrics before initial cluster state is set area: Snapshot/Restore type: bug issues: [] From 3c5bb9e0fedc67e78a5c429754c5416705243ea7 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 10 Oct 2025 17:26:36 +1100 Subject: [PATCH 06/12] Clean up remnants --- .../main/java/org/elasticsearch/snapshots/SnapshotsService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 45aef41ce4466..a25908d0acc6d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -200,7 +200,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement ); private volatile int maxConcurrentOperations; - private volatile ClusterState lastAppliedClusterState; public SnapshotsService( Settings settings, @@ -673,7 +672,6 @@ private Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadat @Override public void applyClusterState(ClusterChangedEvent event) { - lastAppliedClusterState = event.state(); try { if (event.localNodeMaster()) { // We don't remove old master when master flips anymore. So, we need to check for change in master From 3684b57d87a5bd3f69af2c881768e9900ad79a09 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 09:50:42 +1100 Subject: [PATCH 07/12] Remove redundant staleness check --- .../elasticsearch/snapshots/SnapshotMetricsService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java index 0dc5cf3d7a78a..9bc645cace860 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Objects; /** * Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces @@ -129,11 +128,11 @@ private record CachedSnapshotStateMetrics( * Are these metrics stale? * * @param currentClusterState The current cluster state - * @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise + * @return true if these metrics were calculated from a prior {@link SnapshotsInProgress} and need to be recalculated, false + * otherwise */ public boolean isStale(ClusterState currentClusterState) { - return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false - && System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode); + return System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode; } } } From a9e46fec8e3c8c8f60396fa708d34a20ca972201 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 10:08:48 +1100 Subject: [PATCH 08/12] Add test for no-longer master --- .../SnapshotMetricsServiceTests.java | 50 +++++++++++++++---- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java index 35885412e8d81..84814a7d8f4c4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -26,9 +27,12 @@ import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.SnapshotMetrics; import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matcher; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -39,6 +43,9 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -51,15 +58,7 @@ public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { final SnapshotMetricsService snapshotsService = new SnapshotMetricsService(snapshotMetrics, clusterService); // No metrics should be recorded before seeing a cluster state - recordingMeterRegistry.getRecorder().collect(); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), - empty() - ); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), - empty() - ); + collectAndAssertSnapshotAndShardMetricsMatch(recordingMeterRegistry, empty()); verifyNoInteractions(clusterService); // Simulate a cluster state being applied @@ -68,14 +67,43 @@ public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { snapshotsService.clusterChanged(new ClusterChangedEvent("test", withSnapshotsInProgress, withSnapshotsInProgress)); // This time we should publish some metrics + collectAndAssertSnapshotAndShardMetricsMatch(recordingMeterRegistry, not(empty())); + verify(clusterService, times(2)).state(); + + recordingMeterRegistry.getRecorder().resetCalls(); + reset(clusterService); + + // Then publish a new state in which we aren't master + final ClusterState noLongerMaster = ClusterState.builder(withSnapshotsInProgress) + .nodes( + DiscoveryNodes.builder(withSnapshotsInProgress.nodes()) + .masterNodeId( + randomValueOtherThan( + withSnapshotsInProgress.nodes().getLocalNodeId(), + () -> randomFrom(withSnapshotsInProgress.nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet())) + ) + ) + ) + .build(); + snapshotsService.clusterChanged(new ClusterChangedEvent("test", noLongerMaster, noLongerMaster)); + + // We should no longer publish metrics + collectAndAssertSnapshotAndShardMetricsMatch(recordingMeterRegistry, empty()); + verifyNoInteractions(clusterService); + } + + private void collectAndAssertSnapshotAndShardMetricsMatch( + RecordingMeterRegistry recordingMeterRegistry, + Matcher> matcher + ) { recordingMeterRegistry.getRecorder().collect(); assertThat( recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), - not(empty()) + matcher ); assertThat( recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), - not(empty()) + matcher ); } From 1a79b92839ccc09b8c4d7946e71d88197f0232a3 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 10:37:09 +1100 Subject: [PATCH 09/12] Use ClusterService lifecycle to decide when to poll --- .../elasticsearch/node/NodeConstruction.java | 7 ++- ...napshotAndShardByStateMetricsService.java} | 45 ++++++++------- ...otAndShardByStateMetricsServiceTests.java} | 56 +++++++------------ 3 files changed, 46 insertions(+), 62 deletions(-) rename server/src/main/java/org/elasticsearch/snapshots/{SnapshotMetricsService.java => CachingSnapshotAndShardByStateMetricsService.java} (77%) rename server/src/test/java/org/elasticsearch/snapshots/{SnapshotMetricsServiceTests.java => CachingSnapshotAndShardByStateMetricsServiceTests.java} (71%) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 76dde2447afe7..8afd8c3ce6201 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -210,12 +210,12 @@ import org.elasticsearch.search.SearchUtils; import org.elasticsearch.search.aggregations.support.AggregationUsageService; import org.elasticsearch.shutdown.PluginShutdownService; +import org.elasticsearch.snapshots.CachingSnapshotAndShardByStateMetricsService; import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer; import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer.NoOpRestoreTransformer; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService; import org.elasticsearch.snapshots.RestoreService; -import org.elasticsearch.snapshots.SnapshotMetricsService; import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.snapshots.SnapshotsService; @@ -1188,7 +1188,10 @@ public Map queryFields() { transportService, indicesService ); - clusterService.addListener(new SnapshotMetricsService(snapshotMetrics, clusterService)); + final CachingSnapshotAndShardByStateMetricsService cachingSnapshotAndShardByStateMetricsService = + new CachingSnapshotAndShardByStateMetricsService(clusterService); + snapshotMetrics.createSnapshotsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getSnapshotsByState); + snapshotMetrics.createSnapshotShardsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getShardsByState); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService)); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction()); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java b/server/src/main/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsService.java similarity index 77% rename from server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java rename to server/src/main/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsService.java index 9bc645cace860..c5775f548e5b7 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotMetricsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsService.java @@ -9,16 +9,14 @@ package org.elasticsearch.snapshots; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Tuple; -import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.repositories.SnapshotMetrics; import org.elasticsearch.telemetry.metric.LongWithAttributes; @@ -29,40 +27,41 @@ /** * Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces - * metrics on the master node, and only after it's seen a cluster state applied. + * metrics on the master node, and only while the {@link ClusterService} is started. Will only + * re-calculate the metrics if the {@link SnapshotsInProgress} has changed since the last time + * they were calculated. */ -public class SnapshotMetricsService implements ClusterStateListener { +public class CachingSnapshotAndShardByStateMetricsService { private final ClusterService clusterService; - private volatile boolean shouldReturnSnapshotMetrics; private CachedSnapshotStateMetrics cachedSnapshotStateMetrics; - public SnapshotMetricsService(SnapshotMetrics snapshotMetrics, ClusterService clusterService) { + public CachingSnapshotAndShardByStateMetricsService(ClusterService clusterService) { this.clusterService = clusterService; - snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState); - snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState); } - @Override - public void clusterChanged(ClusterChangedEvent event) { - final ClusterState clusterState = event.state(); - // Only return metrics when the state is recovered and we are the master - shouldReturnSnapshotMetrics = clusterState.nodes().isLocalNodeElectedMaster() - && clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false; - } - - private Collection getShardsByState() { - if (shouldReturnSnapshotMetrics == false) { + public Collection getShardsByState() { + if (clusterService.lifecycleState() != Lifecycle.State.STARTED) { + return List.of(); + } + final ClusterState state = clusterService.state(); + if (state.nodes().isLocalNodeElectedMaster() == false) { + // Only the master should report on shards-by-state return List.of(); } - return recalculateIfStale(clusterService.state()).shardStateMetrics(); + return recalculateIfStale(state).shardStateMetrics(); } - private Collection getSnapshotsByState() { - if (shouldReturnSnapshotMetrics == false) { + public Collection getSnapshotsByState() { + if (clusterService.lifecycleState() != Lifecycle.State.STARTED) { + return List.of(); + } + final ClusterState state = clusterService.state(); + if (state.nodes().isLocalNodeElectedMaster() == false) { + // Only the master should report on snapshots-by-state return List.of(); } - return recalculateIfStale(clusterService.state()).snapshotStateMetrics(); + return recalculateIfStale(state).snapshotStateMetrics(); } private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java similarity index 71% rename from server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java rename to server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java index 84814a7d8f4c4..01f8f7f3419e6 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotMetricsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -21,18 +20,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.ShardGeneration; -import org.elasticsearch.repositories.SnapshotMetrics; -import org.elasticsearch.telemetry.InstrumentType; -import org.elasticsearch.telemetry.Measurement; -import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matcher; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,34 +37,36 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -public class SnapshotMetricsServiceTests extends ESTestCase { +public class CachingSnapshotAndShardByStateMetricsServiceTests extends ESTestCase { public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { final ClusterService clusterService = mock(ClusterService.class); - final RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry(); - final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(recordingMeterRegistry); - final SnapshotMetricsService snapshotsService = new SnapshotMetricsService(snapshotMetrics, clusterService); + final CachingSnapshotAndShardByStateMetricsService byStateMetricsService = new CachingSnapshotAndShardByStateMetricsService( + clusterService + ); - // No metrics should be recorded before seeing a cluster state - collectAndAssertSnapshotAndShardMetricsMatch(recordingMeterRegistry, empty()); - verifyNoInteractions(clusterService); + // No metrics should be recorded before the cluster service is started + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.INITIALIZED); + assertThat(byStateMetricsService.getShardsByState(), empty()); + assertThat(byStateMetricsService.getSnapshotsByState(), empty()); + verify(clusterService, never()).state(); - // Simulate a cluster state being applied + // Simulate the metrics service being started/a state being applied + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED); final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(); when(clusterService.state()).thenReturn(withSnapshotsInProgress); - snapshotsService.clusterChanged(new ClusterChangedEvent("test", withSnapshotsInProgress, withSnapshotsInProgress)); // This time we should publish some metrics - collectAndAssertSnapshotAndShardMetricsMatch(recordingMeterRegistry, not(empty())); + assertThat(byStateMetricsService.getShardsByState(), not(empty())); + assertThat(byStateMetricsService.getSnapshotsByState(), not(empty())); verify(clusterService, times(2)).state(); - recordingMeterRegistry.getRecorder().resetCalls(); reset(clusterService); // Then publish a new state in which we aren't master @@ -85,26 +81,12 @@ public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { ) ) .build(); - snapshotsService.clusterChanged(new ClusterChangedEvent("test", noLongerMaster, noLongerMaster)); + when(clusterService.state()).thenReturn(noLongerMaster); // We should no longer publish metrics - collectAndAssertSnapshotAndShardMetricsMatch(recordingMeterRegistry, empty()); - verifyNoInteractions(clusterService); - } - - private void collectAndAssertSnapshotAndShardMetricsMatch( - RecordingMeterRegistry recordingMeterRegistry, - Matcher> matcher - ) { - recordingMeterRegistry.getRecorder().collect(); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOT_SHARDS_BY_STATE), - matcher - ); - assertThat( - recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_GAUGE, SnapshotMetrics.SNAPSHOTS_BY_STATE), - matcher - ); + assertThat(byStateMetricsService.getShardsByState(), empty()); + assertThat(byStateMetricsService.getSnapshotsByState(), empty()); + verify(clusterService, never()).state(); } private ClusterState createClusterStateWithSnapshotsInProgress() { From 6f8978c43cd1399853d2913f4559c8e1052a6c2d Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 11:03:27 +1100 Subject: [PATCH 10/12] Test whole lifecyle --- ...hotAndShardByStateMetricsServiceTests.java | 66 ++++++++++++++++--- 1 file changed, 56 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java index 01f8f7f3419e6..977ac568e63b7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java @@ -25,8 +25,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.test.ESTestCase; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -36,6 +38,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -45,7 +48,9 @@ public class CachingSnapshotAndShardByStateMetricsServiceTests extends ESTestCase { - public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { + public void testMetricsAreOnlyCalculatedWhileClusterServiceIsStartedAndLocalNodeIsMaster() { + final var indexName = randomIdentifier(); + final var repositoryName = randomIdentifier(); final ClusterService clusterService = mock(ClusterService.class); final CachingSnapshotAndShardByStateMetricsService byStateMetricsService = new CachingSnapshotAndShardByStateMetricsService( clusterService @@ -59,17 +64,20 @@ public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { // Simulate the metrics service being started/a state being applied when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED); - final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(); + final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(indexName, repositoryName); when(clusterService.state()).thenReturn(withSnapshotsInProgress); // This time we should publish some metrics - assertThat(byStateMetricsService.getShardsByState(), not(empty())); - assertThat(byStateMetricsService.getSnapshotsByState(), not(empty())); + final Collection shardsByState = byStateMetricsService.getShardsByState(); + final Collection snapshotsByState = byStateMetricsService.getSnapshotsByState(); + assertThat(shardsByState, not(empty())); + assertThat(snapshotsByState, not(empty())); verify(clusterService, times(2)).state(); reset(clusterService); + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED); - // Then publish a new state in which we aren't master + // Then observe a new state in which we aren't master final ClusterState noLongerMaster = ClusterState.builder(withSnapshotsInProgress) .nodes( DiscoveryNodes.builder(withSnapshotsInProgress.nodes()) @@ -80,20 +88,52 @@ public void testMetricsAreOnlyCalculatedAfterAValidClusterStateHasBeenSeen() { ) ) ) + .incrementVersion() .build(); when(clusterService.state()).thenReturn(noLongerMaster); // We should no longer publish metrics assertThat(byStateMetricsService.getShardsByState(), empty()); assertThat(byStateMetricsService.getSnapshotsByState(), empty()); + + // Become master again + final ClusterState masterAgain = ClusterState.builder(noLongerMaster) + .nodes(DiscoveryNodes.builder(noLongerMaster.nodes()).masterNodeId(noLongerMaster.nodes().getLocalNodeId())) + .incrementVersion() + .build(); + when(clusterService.state()).thenReturn(masterAgain); + + // We should return cached metrics because the SnapshotsInProgress hasn't changed + final Collection secondShardsByState = byStateMetricsService.getShardsByState(); + final Collection secondSnapshotsByState = byStateMetricsService.getSnapshotsByState(); + assertThat(secondShardsByState, sameInstance(shardsByState)); + assertThat(secondSnapshotsByState, sameInstance(snapshotsByState)); + + // Update SnapshotsInProgress + final ClusterState newSnapshotsInProgress = ClusterState.builder(masterAgain) + .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(indexName, repositoryName)) + .incrementVersion() + .build(); + when(clusterService.state()).thenReturn(newSnapshotsInProgress); + + // We should return fresh metrics because the SnapshotsInProgress has changed + final Collection thirdShardsByState = byStateMetricsService.getShardsByState(); + final Collection thirdSnapshotsByState = byStateMetricsService.getSnapshotsByState(); + assertThat(thirdShardsByState, not(empty())); + assertThat(thirdSnapshotsByState, not(empty())); + assertThat(thirdShardsByState, not(sameInstance(shardsByState))); + assertThat(thirdSnapshotsByState, not(sameInstance(snapshotsByState))); + + // Then the cluster service is stopped, we should no longer publish metrics + reset(clusterService); + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STOPPED); + assertThat(byStateMetricsService.getShardsByState(), empty()); + assertThat(byStateMetricsService.getSnapshotsByState(), empty()); verify(clusterService, never()).state(); } - private ClusterState createClusterStateWithSnapshotsInProgress() { - final var indexName = randomIdentifier(); - final var repositoryName = randomIdentifier(); + private ClusterState createClusterStateWithSnapshotsInProgress(String indexName, String repositoryName) { final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); - final IndexMetadata index = state.projectState(ProjectId.DEFAULT).metadata().index(indexName); return ClusterState.builder(state) .nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(state.nodes().getLocalNodeId())) .putProjectMetadata( @@ -103,11 +143,17 @@ private ClusterState createClusterStateWithSnapshotsInProgress() { new RepositoriesMetadata(List.of(new RepositoryMetadata(repositoryName, "fs", Settings.EMPTY))) ) ) - .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName))) + .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(indexName, repositoryName)) .incrementVersion() .build(); } + private SnapshotsInProgress createSnapshotsInProgress(String indexName, String repositoryName) { + final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); + final IndexMetadata index = state.projectState(ProjectId.DEFAULT).metadata().index(indexName); + return SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName)); + } + private SnapshotsInProgress.Entry createEntry(IndexMetadata indexMetadata, String repositoryName) { return SnapshotsInProgress.Entry.snapshot( new Snapshot(ProjectId.DEFAULT, repositoryName, new SnapshotId("", "")), From e3146c689bb97ca5e68b6ac4c99bf276d6a6035a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 12:44:41 +1100 Subject: [PATCH 11/12] Make minimum nodes 2 --- .../CachingSnapshotAndShardByStateMetricsServiceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java index 977ac568e63b7..f14a77cf2b3ca 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java @@ -133,7 +133,8 @@ public void testMetricsAreOnlyCalculatedWhileClusterServiceIsStartedAndLocalNode } private ClusterState createClusterStateWithSnapshotsInProgress(String indexName, String repositoryName) { - final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); + // Need to have at least 2 nodes, so we can test when another node is the master + final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(2, 5), randomIntBetween(1, 2)); return ClusterState.builder(state) .nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(state.nodes().getLocalNodeId())) .putProjectMetadata( From 20715a3eb070e07c67055fef34f278baf403eecc Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 12:46:10 +1100 Subject: [PATCH 12/12] Use original cluster state when creating snapshotsInProgress --- ...achingSnapshotAndShardByStateMetricsServiceTests.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java index f14a77cf2b3ca..057b333e12083 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java @@ -111,7 +111,7 @@ public void testMetricsAreOnlyCalculatedWhileClusterServiceIsStartedAndLocalNode // Update SnapshotsInProgress final ClusterState newSnapshotsInProgress = ClusterState.builder(masterAgain) - .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(indexName, repositoryName)) + .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(masterAgain, indexName, repositoryName)) .incrementVersion() .build(); when(clusterService.state()).thenReturn(newSnapshotsInProgress); @@ -144,14 +144,13 @@ private ClusterState createClusterStateWithSnapshotsInProgress(String indexName, new RepositoriesMetadata(List.of(new RepositoryMetadata(repositoryName, "fs", Settings.EMPTY))) ) ) - .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(indexName, repositoryName)) + .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(state, indexName, repositoryName)) .incrementVersion() .build(); } - private SnapshotsInProgress createSnapshotsInProgress(String indexName, String repositoryName) { - final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(1, 3), randomIntBetween(1, 2)); - final IndexMetadata index = state.projectState(ProjectId.DEFAULT).metadata().index(indexName); + private SnapshotsInProgress createSnapshotsInProgress(ClusterState clusterState, String indexName, String repositoryName) { + final IndexMetadata index = clusterState.projectState(ProjectId.DEFAULT).metadata().index(indexName); return SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName)); }