Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136350.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136350
summary: Prevent NPE when generating snapshot metrics before initial cluster state is set
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@
import org.elasticsearch.search.SearchUtils;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.shutdown.PluginShutdownService;
import org.elasticsearch.snapshots.CachingSnapshotAndShardByStateMetricsService;
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer;
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer.NoOpRestoreTransformer;
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
Expand Down Expand Up @@ -1187,6 +1188,10 @@ public Map<String, String> queryFields() {
transportService,
indicesService
);
final CachingSnapshotAndShardByStateMetricsService cachingSnapshotAndShardByStateMetricsService =
new CachingSnapshotAndShardByStateMetricsService(clusterService);
snapshotMetrics.createSnapshotsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getSnapshotsByState);
snapshotMetrics.createSnapshotShardsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getShardsByState);
Copy link
Contributor Author

@nicktindall nicktindall Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behaviour (creation of the gauges) isn't tested in new code, but there are integration tests for these metrics already.


actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService));
actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.snapshots;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.SnapshotMetrics;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces
* metrics on the master node, and only while the {@link ClusterService} is started. Will only
* re-calculate the metrics if the {@link SnapshotsInProgress} has changed since the last time
* they were calculated.
*/
public class CachingSnapshotAndShardByStateMetricsService {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also probably be added to the SnapshotMetrics class, but I feel like it contains sufficient complexity to be it's own thing. Also SnapshotMetrics is currently immutable (a record), and I think the cache would kind-of taint that.


private final ClusterService clusterService;
private CachedSnapshotStateMetrics cachedSnapshotStateMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed that the metrics requests all come from the same thread (or at least, each one strictly happens-before the next)? If not, this should be volatile.


public CachingSnapshotAndShardByStateMetricsService(ClusterService clusterService) {
this.clusterService = clusterService;
}

public Collection<LongWithAttributes> getShardsByState() {
if (clusterService.lifecycleState() != Lifecycle.State.STARTED) {
return List.of();
}
Comment on lines +44 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL also :)

final ClusterState state = clusterService.state();
if (state.nodes().isLocalNodeElectedMaster() == false) {
// Only the master should report on shards-by-state
return List.of();
}
return recalculateIfStale(state).shardStateMetrics();
}

public Collection<LongWithAttributes> getSnapshotsByState() {
if (clusterService.lifecycleState() != Lifecycle.State.STARTED) {
return List.of();
}
final ClusterState state = clusterService.state();
if (state.nodes().isLocalNodeElectedMaster() == false) {
// Only the master should report on snapshots-by-state
return List.of();
}
return recalculateIfStale(state).snapshotStateMetrics();
}

private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) {
if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) {
cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState);
}
return cachedSnapshotStateMetrics;
}

private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) {
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
final List<LongWithAttributes> snapshotStateMetrics = new ArrayList<>();
final List<LongWithAttributes> shardStateMetrics = new ArrayList<>();

currentState.metadata().projects().forEach((projectId, project) -> {
final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project);
if (repositoriesMetadata != null) {
for (RepositoryMetadata repository : repositoriesMetadata.repositories()) {
final Tuple<Map<SnapshotsInProgress.State, Integer>, Map<SnapshotsInProgress.ShardState, Integer>> stateSummaries =
snapshotsInProgress.shardStateSummaryForRepository(projectId, repository.name());
final Map<String, Object> attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository);
stateSummaries.v1()
.forEach(
(snapshotState, count) -> snapshotStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name()))
)
);
stateSummaries.v2()
.forEach(
(shardState, count) -> shardStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name()))
)
);
}
}
});
return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics);
}

/**
* A cached copy of the snapshot and shard state metrics
*/
private record CachedSnapshotStateMetrics(
String clusterStateId,
int snapshotsInProgressIdentityHashcode,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
CachedSnapshotStateMetrics(
ClusterState sourceState,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
this(
sourceState.stateUUID(),
System.identityHashCode(SnapshotsInProgress.get(sourceState)),
snapshotStateMetrics,
shardStateMetrics
);
}

/**
* Are these metrics stale?
*
* @param currentClusterState The current cluster state
* @return true if these metrics were calculated from a prior {@link SnapshotsInProgress} and need to be recalculated, false
* otherwise
*/
public boolean isStale(ClusterState currentClusterState) {
return System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.ShardSnapshotResult;
import org.elasticsearch.repositories.SnapshotMetrics;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -187,8 +186,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement

private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler;

private CachedSnapshotStateMetrics cachedSnapshotStateMetrics;

/**
* Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the
* cluster state. The number of concurrent operations in a cluster state is defined as the sum of
Expand Down Expand Up @@ -221,8 +218,6 @@ public SnapshotsService(
this.repositoriesService = repositoriesService;
this.threadPool = transportService.getThreadPool();
this.snapshotMetrics = snapshotMetrics;
snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState);
snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState);

if (DiscoveryNode.isMasterNode(settings)) {
// addLowPriorityApplier to make sure that Repository will be created before snapshot
Expand Down Expand Up @@ -3357,61 +3352,6 @@ private SnapshotsInProgress createSnapshot(
}
}

private Collection<LongWithAttributes> getShardsByState() {
final ClusterState currentState = clusterService.state();
// Only the master should report on shards-by-state
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
return List.of();
}
return recalculateIfStale(currentState).shardStateMetrics();
}

private Collection<LongWithAttributes> getSnapshotsByState() {
final ClusterState currentState = clusterService.state();
// Only the master should report on snapshots-by-state
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
return List.of();
}
return recalculateIfStale(currentState).snapshotStateMetrics();
}

private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) {
if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) {
cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState);
}
return cachedSnapshotStateMetrics;
}

private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) {
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
final List<LongWithAttributes> snapshotStateMetrics = new ArrayList<>();
final List<LongWithAttributes> shardStateMetrics = new ArrayList<>();

currentState.metadata().projects().forEach((projectId, project) -> {
final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project);
if (repositoriesMetadata != null) {
for (RepositoryMetadata repository : repositoriesMetadata.repositories()) {
final Tuple<Map<SnapshotsInProgress.State, Integer>, Map<ShardState, Integer>> stateSummaries = snapshotsInProgress
.shardStateSummaryForRepository(projectId, repository.name());
final Map<String, Object> attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository);
stateSummaries.v1()
.forEach(
(snapshotState, count) -> snapshotStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name()))
)
);
stateSummaries.v2()
.forEach(
(shardState, count) -> shardStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name()))
)
);
}
}
});
return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics);
}

private record UpdateNodeIdsForRemovalTask() implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
Expand Down Expand Up @@ -3439,38 +3379,4 @@ static ClusterState executeBatch(
}

private final MasterServiceTaskQueue<UpdateNodeIdsForRemovalTask> updateNodeIdsToRemoveQueue;

/**
* A cached copy of the snapshot and shard state metrics
*/
private record CachedSnapshotStateMetrics(
String clusterStateId,
int snapshotsInProgressIdentityHashcode,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
CachedSnapshotStateMetrics(
ClusterState sourceState,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
this(
sourceState.stateUUID(),
System.identityHashCode(SnapshotsInProgress.get(sourceState)),
snapshotStateMetrics,
shardStateMetrics
);
}

/**
* Are these metrics stale?
*
* @param currentClusterState The current cluster state
* @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise
*/
public boolean isStale(ClusterState currentClusterState) {
return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false
&& System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode);
}
}
}
Loading