Skip to content

Commit d100c83

Browse files
authored
Prevent NPE when generating snapshot metrics before initial cluster state is set (#136350)
1 parent a1597d0 commit d100c83

File tree

5 files changed

+328
-94
lines changed

5 files changed

+328
-94
lines changed

docs/changelog/136350.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136350
2+
summary: Prevent NPE when generating snapshot metrics before initial cluster state is set
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@
210210
import org.elasticsearch.search.SearchUtils;
211211
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
212212
import org.elasticsearch.shutdown.PluginShutdownService;
213+
import org.elasticsearch.snapshots.CachingSnapshotAndShardByStateMetricsService;
213214
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer;
214215
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer.NoOpRestoreTransformer;
215216
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
@@ -1189,6 +1190,10 @@ public Map<String, String> queryFields() {
11891190
transportService,
11901191
indicesService
11911192
);
1193+
final CachingSnapshotAndShardByStateMetricsService cachingSnapshotAndShardByStateMetricsService =
1194+
new CachingSnapshotAndShardByStateMetricsService(clusterService);
1195+
snapshotMetrics.createSnapshotsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getSnapshotsByState);
1196+
snapshotMetrics.createSnapshotShardsByStateMetric(cachingSnapshotAndShardByStateMetricsService::getShardsByState);
11921197

11931198
actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService));
11941199
actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction());
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.snapshots;
11+
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.SnapshotsInProgress;
14+
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
15+
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
16+
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.common.component.Lifecycle;
18+
import org.elasticsearch.common.util.Maps;
19+
import org.elasticsearch.core.Tuple;
20+
import org.elasticsearch.repositories.SnapshotMetrics;
21+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces
30+
* metrics on the master node, and only while the {@link ClusterService} is started. Will only
31+
* re-calculate the metrics if the {@link SnapshotsInProgress} has changed since the last time
32+
* they were calculated.
33+
*/
34+
public class CachingSnapshotAndShardByStateMetricsService {
35+
36+
private final ClusterService clusterService;
37+
private volatile CachedSnapshotStateMetrics cachedSnapshotStateMetrics;
38+
39+
public CachingSnapshotAndShardByStateMetricsService(ClusterService clusterService) {
40+
this.clusterService = clusterService;
41+
}
42+
43+
public Collection<LongWithAttributes> getShardsByState() {
44+
if (clusterService.lifecycleState() != Lifecycle.State.STARTED) {
45+
return List.of();
46+
}
47+
final ClusterState state = clusterService.state();
48+
if (state.nodes().isLocalNodeElectedMaster() == false) {
49+
// Only the master should report on shards-by-state
50+
return List.of();
51+
}
52+
return recalculateIfStale(state).shardStateMetrics();
53+
}
54+
55+
public Collection<LongWithAttributes> getSnapshotsByState() {
56+
if (clusterService.lifecycleState() != Lifecycle.State.STARTED) {
57+
return List.of();
58+
}
59+
final ClusterState state = clusterService.state();
60+
if (state.nodes().isLocalNodeElectedMaster() == false) {
61+
// Only the master should report on snapshots-by-state
62+
return List.of();
63+
}
64+
return recalculateIfStale(state).snapshotStateMetrics();
65+
}
66+
67+
private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) {
68+
if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) {
69+
cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState);
70+
}
71+
return cachedSnapshotStateMetrics;
72+
}
73+
74+
private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) {
75+
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
76+
final List<LongWithAttributes> snapshotStateMetrics = new ArrayList<>();
77+
final List<LongWithAttributes> shardStateMetrics = new ArrayList<>();
78+
79+
currentState.metadata().projects().forEach((projectId, project) -> {
80+
final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project);
81+
if (repositoriesMetadata != null) {
82+
for (RepositoryMetadata repository : repositoriesMetadata.repositories()) {
83+
final Tuple<Map<SnapshotsInProgress.State, Integer>, Map<SnapshotsInProgress.ShardState, Integer>> stateSummaries =
84+
snapshotsInProgress.shardStateSummaryForRepository(projectId, repository.name());
85+
final Map<String, Object> attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository);
86+
stateSummaries.v1()
87+
.forEach(
88+
(snapshotState, count) -> snapshotStateMetrics.add(
89+
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name()))
90+
)
91+
);
92+
stateSummaries.v2()
93+
.forEach(
94+
(shardState, count) -> shardStateMetrics.add(
95+
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name()))
96+
)
97+
);
98+
}
99+
}
100+
});
101+
return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics);
102+
}
103+
104+
/**
105+
* A cached copy of the snapshot and shard state metrics
106+
*/
107+
private record CachedSnapshotStateMetrics(
108+
String clusterStateId,
109+
int snapshotsInProgressIdentityHashcode,
110+
Collection<LongWithAttributes> snapshotStateMetrics,
111+
Collection<LongWithAttributes> shardStateMetrics
112+
) {
113+
CachedSnapshotStateMetrics(
114+
ClusterState sourceState,
115+
Collection<LongWithAttributes> snapshotStateMetrics,
116+
Collection<LongWithAttributes> shardStateMetrics
117+
) {
118+
this(
119+
sourceState.stateUUID(),
120+
System.identityHashCode(SnapshotsInProgress.get(sourceState)),
121+
snapshotStateMetrics,
122+
shardStateMetrics
123+
);
124+
}
125+
126+
/**
127+
* Are these metrics stale?
128+
*
129+
* @param currentClusterState The current cluster state
130+
* @return true if these metrics were calculated from a prior {@link SnapshotsInProgress} and need to be recalculated, false
131+
* otherwise
132+
*/
133+
public boolean isStale(ClusterState currentClusterState) {
134+
return System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode;
135+
}
136+
}
137+
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
import org.elasticsearch.repositories.ShardGenerations;
9191
import org.elasticsearch.repositories.ShardSnapshotResult;
9292
import org.elasticsearch.repositories.SnapshotMetrics;
93-
import org.elasticsearch.telemetry.metric.LongWithAttributes;
9493
import org.elasticsearch.threadpool.ThreadPool;
9594
import org.elasticsearch.transport.TransportService;
9695

@@ -187,8 +186,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
187186

188187
private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler;
189188

190-
private CachedSnapshotStateMetrics cachedSnapshotStateMetrics;
191-
192189
/**
193190
* Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the
194191
* cluster state. The number of concurrent operations in a cluster state is defined as the sum of
@@ -221,8 +218,6 @@ public SnapshotsService(
221218
this.repositoriesService = repositoriesService;
222219
this.threadPool = transportService.getThreadPool();
223220
this.snapshotMetrics = snapshotMetrics;
224-
snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState);
225-
snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState);
226221

227222
if (DiscoveryNode.isMasterNode(settings)) {
228223
// addLowPriorityApplier to make sure that Repository will be created before snapshot
@@ -3357,61 +3352,6 @@ private SnapshotsInProgress createSnapshot(
33573352
}
33583353
}
33593354

3360-
private Collection<LongWithAttributes> getShardsByState() {
3361-
final ClusterState currentState = clusterService.state();
3362-
// Only the master should report on shards-by-state
3363-
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
3364-
return List.of();
3365-
}
3366-
return recalculateIfStale(currentState).shardStateMetrics();
3367-
}
3368-
3369-
private Collection<LongWithAttributes> getSnapshotsByState() {
3370-
final ClusterState currentState = clusterService.state();
3371-
// Only the master should report on snapshots-by-state
3372-
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
3373-
return List.of();
3374-
}
3375-
return recalculateIfStale(currentState).snapshotStateMetrics();
3376-
}
3377-
3378-
private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) {
3379-
if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) {
3380-
cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState);
3381-
}
3382-
return cachedSnapshotStateMetrics;
3383-
}
3384-
3385-
private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) {
3386-
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
3387-
final List<LongWithAttributes> snapshotStateMetrics = new ArrayList<>();
3388-
final List<LongWithAttributes> shardStateMetrics = new ArrayList<>();
3389-
3390-
currentState.metadata().projects().forEach((projectId, project) -> {
3391-
final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project);
3392-
if (repositoriesMetadata != null) {
3393-
for (RepositoryMetadata repository : repositoriesMetadata.repositories()) {
3394-
final Tuple<Map<SnapshotsInProgress.State, Integer>, Map<ShardState, Integer>> stateSummaries = snapshotsInProgress
3395-
.shardStateSummaryForRepository(projectId, repository.name());
3396-
final Map<String, Object> attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository);
3397-
stateSummaries.v1()
3398-
.forEach(
3399-
(snapshotState, count) -> snapshotStateMetrics.add(
3400-
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name()))
3401-
)
3402-
);
3403-
stateSummaries.v2()
3404-
.forEach(
3405-
(shardState, count) -> shardStateMetrics.add(
3406-
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name()))
3407-
)
3408-
);
3409-
}
3410-
}
3411-
});
3412-
return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics);
3413-
}
3414-
34153355
private record UpdateNodeIdsForRemovalTask() implements ClusterStateTaskListener {
34163356
@Override
34173357
public void onFailure(Exception e) {
@@ -3439,38 +3379,4 @@ static ClusterState executeBatch(
34393379
}
34403380

34413381
private final MasterServiceTaskQueue<UpdateNodeIdsForRemovalTask> updateNodeIdsToRemoveQueue;
3442-
3443-
/**
3444-
* A cached copy of the snapshot and shard state metrics
3445-
*/
3446-
private record CachedSnapshotStateMetrics(
3447-
String clusterStateId,
3448-
int snapshotsInProgressIdentityHashcode,
3449-
Collection<LongWithAttributes> snapshotStateMetrics,
3450-
Collection<LongWithAttributes> shardStateMetrics
3451-
) {
3452-
CachedSnapshotStateMetrics(
3453-
ClusterState sourceState,
3454-
Collection<LongWithAttributes> snapshotStateMetrics,
3455-
Collection<LongWithAttributes> shardStateMetrics
3456-
) {
3457-
this(
3458-
sourceState.stateUUID(),
3459-
System.identityHashCode(SnapshotsInProgress.get(sourceState)),
3460-
snapshotStateMetrics,
3461-
shardStateMetrics
3462-
);
3463-
}
3464-
3465-
/**
3466-
* Are these metrics stale?
3467-
*
3468-
* @param currentClusterState The current cluster state
3469-
* @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise
3470-
*/
3471-
public boolean isStale(ClusterState currentClusterState) {
3472-
return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false
3473-
&& System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode);
3474-
}
3475-
}
34763382
}

0 commit comments

Comments
 (0)