diff --git a/docs/changelog/136350.yaml b/docs/changelog/136350.yaml new file mode 100644 index 0000000000000..994a9c6fda0e0 --- /dev/null +++ b/docs/changelog/136350.yaml @@ -0,0 +1,5 @@ +pr: 136350 +summary: Prevent NPE when generating snapshot metrics before initial cluster state is set +area: Snapshot/Restore +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 6c4331e97f609..8fa0a3ce1c98c 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -210,6 +210,7 @@ import org.elasticsearch.search.SearchUtils; import org.elasticsearch.search.aggregations.support.AggregationUsageService; import org.elasticsearch.shutdown.PluginShutdownService; +import org.elasticsearch.snapshots.CachingSnapshotAndShardByStateMetricsService; import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer; import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer.NoOpRestoreTransformer; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; @@ -1189,6 +1190,10 @@ public Map queryFields() { transportService, indicesService ); + final CachingSnapshotAndShardByStateMetricsService cachingSnapshotAndShardByStateMetricsService = + new CachingSnapshotAndShardByStateMetricsService(clusterService); + snapshotMetrics.createSnapshotsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getSnapshotsByState); + snapshotMetrics.createSnapshotShardsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getShardsByState); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService)); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction()); diff --git a/server/src/main/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsService.java b/server/src/main/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsService.java new file mode 100644 index 0000000000000..c5775f548e5b7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsService.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.repositories.SnapshotMetrics; +import org.elasticsearch.telemetry.metric.LongWithAttributes; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces + * metrics on the master node, and only while the {@link ClusterService} is started. Will only + * re-calculate the metrics if the {@link SnapshotsInProgress} has changed since the last time + * they were calculated. + */ +public class CachingSnapshotAndShardByStateMetricsService { + + private final ClusterService clusterService; + private CachedSnapshotStateMetrics cachedSnapshotStateMetrics; + + public CachingSnapshotAndShardByStateMetricsService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public Collection getShardsByState() { + if (clusterService.lifecycleState() != Lifecycle.State.STARTED) { + return List.of(); + } + final ClusterState state = clusterService.state(); + if (state.nodes().isLocalNodeElectedMaster() == false) { + // Only the master should report on shards-by-state + return List.of(); + } + return recalculateIfStale(state).shardStateMetrics(); + } + + public Collection getSnapshotsByState() { + if (clusterService.lifecycleState() != Lifecycle.State.STARTED) { + return List.of(); + } + final ClusterState state = clusterService.state(); + if (state.nodes().isLocalNodeElectedMaster() == false) { + // Only the master should report on snapshots-by-state + return List.of(); + } + return recalculateIfStale(state).snapshotStateMetrics(); + } + + private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) { + if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) { + cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState); + } + return cachedSnapshotStateMetrics; + } + + private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) { + final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState); + final List snapshotStateMetrics = new ArrayList<>(); + final List shardStateMetrics = new ArrayList<>(); + + currentState.metadata().projects().forEach((projectId, project) -> { + final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project); + if (repositoriesMetadata != null) { + for (RepositoryMetadata repository : repositoriesMetadata.repositories()) { + final Tuple, Map> stateSummaries = + snapshotsInProgress.shardStateSummaryForRepository(projectId, repository.name()); + final Map attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository); + stateSummaries.v1() + .forEach( + (snapshotState, count) -> snapshotStateMetrics.add( + new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name())) + ) + ); + stateSummaries.v2() + .forEach( + (shardState, count) -> shardStateMetrics.add( + new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name())) + ) + ); + } + } + }); + return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics); + } + + /** + * A cached copy of the snapshot and shard state metrics + */ + private record CachedSnapshotStateMetrics( + String clusterStateId, + int snapshotsInProgressIdentityHashcode, + Collection snapshotStateMetrics, + Collection shardStateMetrics + ) { + CachedSnapshotStateMetrics( + ClusterState sourceState, + Collection snapshotStateMetrics, + Collection shardStateMetrics + ) { + this( + sourceState.stateUUID(), + System.identityHashCode(SnapshotsInProgress.get(sourceState)), + snapshotStateMetrics, + shardStateMetrics + ); + } + + /** + * Are these metrics stale? + * + * @param currentClusterState The current cluster state + * @return true if these metrics were calculated from a prior {@link SnapshotsInProgress} and need to be recalculated, false + * otherwise + */ + public boolean isStale(ClusterState currentClusterState) { + return System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f28d4948f657a..a25908d0acc6d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -90,7 +90,6 @@ import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotMetrics; -import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -187,8 +186,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler; - private CachedSnapshotStateMetrics cachedSnapshotStateMetrics; - /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of @@ -221,8 +218,6 @@ public SnapshotsService( this.repositoriesService = repositoriesService; this.threadPool = transportService.getThreadPool(); this.snapshotMetrics = snapshotMetrics; - snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState); - snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState); if (DiscoveryNode.isMasterNode(settings)) { // addLowPriorityApplier to make sure that Repository will be created before snapshot @@ -3357,61 +3352,6 @@ private SnapshotsInProgress createSnapshot( } } - private Collection getShardsByState() { - final ClusterState currentState = clusterService.state(); - // Only the master should report on shards-by-state - if (currentState.nodes().isLocalNodeElectedMaster() == false) { - return List.of(); - } - return recalculateIfStale(currentState).shardStateMetrics(); - } - - private Collection getSnapshotsByState() { - final ClusterState currentState = clusterService.state(); - // Only the master should report on snapshots-by-state - if (currentState.nodes().isLocalNodeElectedMaster() == false) { - return List.of(); - } - return recalculateIfStale(currentState).snapshotStateMetrics(); - } - - private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) { - if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) { - cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState); - } - return cachedSnapshotStateMetrics; - } - - private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) { - final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState); - final List snapshotStateMetrics = new ArrayList<>(); - final List shardStateMetrics = new ArrayList<>(); - - currentState.metadata().projects().forEach((projectId, project) -> { - final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project); - if (repositoriesMetadata != null) { - for (RepositoryMetadata repository : repositoriesMetadata.repositories()) { - final Tuple, Map> stateSummaries = snapshotsInProgress - .shardStateSummaryForRepository(projectId, repository.name()); - final Map attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository); - stateSummaries.v1() - .forEach( - (snapshotState, count) -> snapshotStateMetrics.add( - new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name())) - ) - ); - stateSummaries.v2() - .forEach( - (shardState, count) -> shardStateMetrics.add( - new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name())) - ) - ); - } - } - }); - return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics); - } - private record UpdateNodeIdsForRemovalTask() implements ClusterStateTaskListener { @Override public void onFailure(Exception e) { @@ -3439,38 +3379,4 @@ static ClusterState executeBatch( } private final MasterServiceTaskQueue updateNodeIdsToRemoveQueue; - - /** - * A cached copy of the snapshot and shard state metrics - */ - private record CachedSnapshotStateMetrics( - String clusterStateId, - int snapshotsInProgressIdentityHashcode, - Collection snapshotStateMetrics, - Collection shardStateMetrics - ) { - CachedSnapshotStateMetrics( - ClusterState sourceState, - Collection snapshotStateMetrics, - Collection shardStateMetrics - ) { - this( - sourceState.stateUUID(), - System.identityHashCode(SnapshotsInProgress.get(sourceState)), - snapshotStateMetrics, - shardStateMetrics - ); - } - - /** - * Are these metrics stale? - * - * @param currentClusterState The current cluster state - * @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise - */ - public boolean isStale(ClusterState currentClusterState) { - return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false - && System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode); - } - } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java new file mode 100644 index 0000000000000..057b333e12083 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/CachingSnapshotAndShardByStateMetricsServiceTests.java @@ -0,0 +1,181 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CachingSnapshotAndShardByStateMetricsServiceTests extends ESTestCase { + + public void testMetricsAreOnlyCalculatedWhileClusterServiceIsStartedAndLocalNodeIsMaster() { + final var indexName = randomIdentifier(); + final var repositoryName = randomIdentifier(); + final ClusterService clusterService = mock(ClusterService.class); + final CachingSnapshotAndShardByStateMetricsService byStateMetricsService = new CachingSnapshotAndShardByStateMetricsService( + clusterService + ); + + // No metrics should be recorded before the cluster service is started + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.INITIALIZED); + assertThat(byStateMetricsService.getShardsByState(), empty()); + assertThat(byStateMetricsService.getSnapshotsByState(), empty()); + verify(clusterService, never()).state(); + + // Simulate the metrics service being started/a state being applied + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED); + final ClusterState withSnapshotsInProgress = createClusterStateWithSnapshotsInProgress(indexName, repositoryName); + when(clusterService.state()).thenReturn(withSnapshotsInProgress); + + // This time we should publish some metrics + final Collection shardsByState = byStateMetricsService.getShardsByState(); + final Collection snapshotsByState = byStateMetricsService.getSnapshotsByState(); + assertThat(shardsByState, not(empty())); + assertThat(snapshotsByState, not(empty())); + verify(clusterService, times(2)).state(); + + reset(clusterService); + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED); + + // Then observe a new state in which we aren't master + final ClusterState noLongerMaster = ClusterState.builder(withSnapshotsInProgress) + .nodes( + DiscoveryNodes.builder(withSnapshotsInProgress.nodes()) + .masterNodeId( + randomValueOtherThan( + withSnapshotsInProgress.nodes().getLocalNodeId(), + () -> randomFrom(withSnapshotsInProgress.nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet())) + ) + ) + ) + .incrementVersion() + .build(); + when(clusterService.state()).thenReturn(noLongerMaster); + + // We should no longer publish metrics + assertThat(byStateMetricsService.getShardsByState(), empty()); + assertThat(byStateMetricsService.getSnapshotsByState(), empty()); + + // Become master again + final ClusterState masterAgain = ClusterState.builder(noLongerMaster) + .nodes(DiscoveryNodes.builder(noLongerMaster.nodes()).masterNodeId(noLongerMaster.nodes().getLocalNodeId())) + .incrementVersion() + .build(); + when(clusterService.state()).thenReturn(masterAgain); + + // We should return cached metrics because the SnapshotsInProgress hasn't changed + final Collection secondShardsByState = byStateMetricsService.getShardsByState(); + final Collection secondSnapshotsByState = byStateMetricsService.getSnapshotsByState(); + assertThat(secondShardsByState, sameInstance(shardsByState)); + assertThat(secondSnapshotsByState, sameInstance(snapshotsByState)); + + // Update SnapshotsInProgress + final ClusterState newSnapshotsInProgress = ClusterState.builder(masterAgain) + .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(masterAgain, indexName, repositoryName)) + .incrementVersion() + .build(); + when(clusterService.state()).thenReturn(newSnapshotsInProgress); + + // We should return fresh metrics because the SnapshotsInProgress has changed + final Collection thirdShardsByState = byStateMetricsService.getShardsByState(); + final Collection thirdSnapshotsByState = byStateMetricsService.getSnapshotsByState(); + assertThat(thirdShardsByState, not(empty())); + assertThat(thirdSnapshotsByState, not(empty())); + assertThat(thirdShardsByState, not(sameInstance(shardsByState))); + assertThat(thirdSnapshotsByState, not(sameInstance(snapshotsByState))); + + // Then the cluster service is stopped, we should no longer publish metrics + reset(clusterService); + when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STOPPED); + assertThat(byStateMetricsService.getShardsByState(), empty()); + assertThat(byStateMetricsService.getSnapshotsByState(), empty()); + verify(clusterService, never()).state(); + } + + private ClusterState createClusterStateWithSnapshotsInProgress(String indexName, String repositoryName) { + // Need to have at least 2 nodes, so we can test when another node is the master + final ClusterState state = ClusterStateCreationUtils.state(indexName, randomIntBetween(2, 5), randomIntBetween(1, 2)); + return ClusterState.builder(state) + .nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(state.nodes().getLocalNodeId())) + .putProjectMetadata( + ProjectMetadata.builder(state.getMetadata().getProject(ProjectId.DEFAULT)) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(List.of(new RepositoryMetadata(repositoryName, "fs", Settings.EMPTY))) + ) + ) + .putCustom(SnapshotsInProgress.TYPE, createSnapshotsInProgress(state, indexName, repositoryName)) + .incrementVersion() + .build(); + } + + private SnapshotsInProgress createSnapshotsInProgress(ClusterState clusterState, String indexName, String repositoryName) { + final IndexMetadata index = clusterState.projectState(ProjectId.DEFAULT).metadata().index(indexName); + return SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(index, repositoryName)); + } + + private SnapshotsInProgress.Entry createEntry(IndexMetadata indexMetadata, String repositoryName) { + return SnapshotsInProgress.Entry.snapshot( + new Snapshot(ProjectId.DEFAULT, repositoryName, new SnapshotId("", "")), + false, + randomBoolean(), + SnapshotsInProgress.State.STARTED, + Map.of(indexMetadata.getIndex().getName(), new IndexId(indexMetadata.getIndex().getName(), randomIdentifier())), + List.of(), + Collections.emptyList(), + 0, + 1, + IntStream.range(0, indexMetadata.getNumberOfShards()) + .mapToObj(i -> new ShardId(indexMetadata.getIndex(), i)) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + shardId -> new SnapshotsInProgress.ShardSnapshotStatus(randomIdentifier(), ShardGeneration.newGeneration()) + ) + ), + null, + null, + null + ); + } +}