Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136350.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136350
summary: Prevent NPE when generating snapshot metrics before initial cluster state is set
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotMetricsService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.snapshots.SnapshotsService;
Expand Down Expand Up @@ -1187,6 +1188,7 @@ public Map<String, String> queryFields() {
transportService,
indicesService
);
clusterService.addListener(new SnapshotMetricsService(snapshotMetrics, clusterService));

actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService));
actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.snapshots;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.repositories.SnapshotMetrics;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Generates the snapshots-by-state and shards-by-state metrics when polled. Only produces
* metrics on the master node, and only after it's seen a cluster state applied.
*/
public class SnapshotMetricsService implements ClusterStateListener {

private final ClusterService clusterService;
private volatile boolean shouldReturnSnapshotMetrics;
private CachedSnapshotStateMetrics cachedSnapshotStateMetrics;

public SnapshotMetricsService(SnapshotMetrics snapshotMetrics, ClusterService clusterService) {
this.clusterService = clusterService;
snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState);
snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
final ClusterState clusterState = event.state();
// Only return metrics when the state is recovered and we are the master
shouldReturnSnapshotMetrics = clusterState.nodes().isLocalNodeElectedMaster()
&& clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false;
}

private Collection<LongWithAttributes> getShardsByState() {
if (shouldReturnSnapshotMetrics == false) {
return List.of();
}
return recalculateIfStale(clusterService.state()).shardStateMetrics();
}

private Collection<LongWithAttributes> getSnapshotsByState() {
if (shouldReturnSnapshotMetrics == false) {
return List.of();
}
return recalculateIfStale(clusterService.state()).snapshotStateMetrics();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight race if the node is demoted from master, it could happen just after we evaluate shouldReturnSnapshotMetrics == true, but it's no big deal and definitely not worth synchronizing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same issue with other Gauge metrics. I don't think it's important enough to address.

}

private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) {
if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) {
cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState);
}
return cachedSnapshotStateMetrics;
}

private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) {
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
final List<LongWithAttributes> snapshotStateMetrics = new ArrayList<>();
final List<LongWithAttributes> shardStateMetrics = new ArrayList<>();

currentState.metadata().projects().forEach((projectId, project) -> {
final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project);
if (repositoriesMetadata != null) {
for (RepositoryMetadata repository : repositoriesMetadata.repositories()) {
final Tuple<Map<SnapshotsInProgress.State, Integer>, Map<SnapshotsInProgress.ShardState, Integer>> stateSummaries =
snapshotsInProgress.shardStateSummaryForRepository(projectId, repository.name());
final Map<String, Object> attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository);
stateSummaries.v1()
.forEach(
(snapshotState, count) -> snapshotStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name()))
)
);
stateSummaries.v2()
.forEach(
(shardState, count) -> shardStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name()))
)
);
}
}
});
return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics);
}

/**
* A cached copy of the snapshot and shard state metrics
*/
private record CachedSnapshotStateMetrics(
String clusterStateId,
int snapshotsInProgressIdentityHashcode,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
CachedSnapshotStateMetrics(
ClusterState sourceState,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
this(
sourceState.stateUUID(),
System.identityHashCode(SnapshotsInProgress.get(sourceState)),
snapshotStateMetrics,
shardStateMetrics
);
}

/**
* Are these metrics stale?
*
* @param currentClusterState The current cluster state
* @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise
*/
public boolean isStale(ClusterState currentClusterState) {
return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false
&& System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.ShardSnapshotResult;
import org.elasticsearch.repositories.SnapshotMetrics;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -187,8 +186,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement

private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler;

private CachedSnapshotStateMetrics cachedSnapshotStateMetrics;

/**
* Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the
* cluster state. The number of concurrent operations in a cluster state is defined as the sum of
Expand All @@ -203,6 +200,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
);

private volatile int maxConcurrentOperations;
private volatile ClusterState lastAppliedClusterState;

public SnapshotsService(
Settings settings,
Expand All @@ -221,8 +219,6 @@ public SnapshotsService(
this.repositoriesService = repositoriesService;
this.threadPool = transportService.getThreadPool();
this.snapshotMetrics = snapshotMetrics;
snapshotMetrics.createSnapshotShardsByStateMetric(this::getShardsByState);
snapshotMetrics.createSnapshotsByStateMetric(this::getSnapshotsByState);

if (DiscoveryNode.isMasterNode(settings)) {
// addLowPriorityApplier to make sure that Repository will be created before snapshot
Expand Down Expand Up @@ -677,6 +673,7 @@ private Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadat

@Override
public void applyClusterState(ClusterChangedEvent event) {
lastAppliedClusterState = event.state();
try {
if (event.localNodeMaster()) {
// We don't remove old master when master flips anymore. So, we need to check for change in master
Expand Down Expand Up @@ -3357,61 +3354,6 @@ private SnapshotsInProgress createSnapshot(
}
}

private Collection<LongWithAttributes> getShardsByState() {
final ClusterState currentState = clusterService.state();
// Only the master should report on shards-by-state
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
return List.of();
}
return recalculateIfStale(currentState).shardStateMetrics();
}

private Collection<LongWithAttributes> getSnapshotsByState() {
final ClusterState currentState = clusterService.state();
// Only the master should report on snapshots-by-state
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
return List.of();
}
return recalculateIfStale(currentState).snapshotStateMetrics();
}

private CachedSnapshotStateMetrics recalculateIfStale(ClusterState currentState) {
if (cachedSnapshotStateMetrics == null || cachedSnapshotStateMetrics.isStale(currentState)) {
cachedSnapshotStateMetrics = recalculateSnapshotStats(currentState);
}
return cachedSnapshotStateMetrics;
}

private CachedSnapshotStateMetrics recalculateSnapshotStats(ClusterState currentState) {
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
final List<LongWithAttributes> snapshotStateMetrics = new ArrayList<>();
final List<LongWithAttributes> shardStateMetrics = new ArrayList<>();

currentState.metadata().projects().forEach((projectId, project) -> {
final RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.get(project);
if (repositoriesMetadata != null) {
for (RepositoryMetadata repository : repositoriesMetadata.repositories()) {
final Tuple<Map<SnapshotsInProgress.State, Integer>, Map<ShardState, Integer>> stateSummaries = snapshotsInProgress
.shardStateSummaryForRepository(projectId, repository.name());
final Map<String, Object> attributesMap = SnapshotMetrics.createAttributesMap(projectId, repository);
stateSummaries.v1()
.forEach(
(snapshotState, count) -> snapshotStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", snapshotState.name()))
)
);
stateSummaries.v2()
.forEach(
(shardState, count) -> shardStateMetrics.add(
new LongWithAttributes(count, Maps.copyMapWithAddedEntry(attributesMap, "state", shardState.name()))
)
);
}
}
});
return new CachedSnapshotStateMetrics(currentState, snapshotStateMetrics, shardStateMetrics);
}

private record UpdateNodeIdsForRemovalTask() implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
Expand Down Expand Up @@ -3439,38 +3381,4 @@ static ClusterState executeBatch(
}

private final MasterServiceTaskQueue<UpdateNodeIdsForRemovalTask> updateNodeIdsToRemoveQueue;

/**
* A cached copy of the snapshot and shard state metrics
*/
private record CachedSnapshotStateMetrics(
String clusterStateId,
int snapshotsInProgressIdentityHashcode,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
CachedSnapshotStateMetrics(
ClusterState sourceState,
Collection<LongWithAttributes> snapshotStateMetrics,
Collection<LongWithAttributes> shardStateMetrics
) {
this(
sourceState.stateUUID(),
System.identityHashCode(SnapshotsInProgress.get(sourceState)),
snapshotStateMetrics,
shardStateMetrics
);
}

/**
* Are these metrics stale?
*
* @param currentClusterState The current cluster state
* @return true if these metrics were calculated from a prior cluster state and need to be recalculated, false otherwise
*/
public boolean isStale(ClusterState currentClusterState) {
return (Objects.equals(clusterStateId, currentClusterState.stateUUID()) == false
&& System.identityHashCode(SnapshotsInProgress.get(currentClusterState)) != snapshotsInProgressIdentityHashcode);
}
}
}
Loading