Skip to content

Commit 4292017

Browse files
committed
improve node counting a bit
1 parent 82765b2 commit 4292017

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ public Entry withClones(Map<RepositoryShardId, ShardSnapshotStatus> updatedClone
12311231
* @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly
12321232
*/
12331233
@Nullable
1234-
public Entry abort(BiConsumer<ShardId, ShardSnapshotStatus> abortedAssignedQueuedShardConsumer) {
1234+
public Entry abort(String localNodeId, BiConsumer<ShardId, ShardSnapshotStatus> abortedAssignedQueuedShardConsumer) {
12351235
final Map<ShardId, ShardSnapshotStatus> shardsBuilder = new HashMap<>();
12361236
boolean completed = true;
12371237
boolean allQueued = true;
@@ -1262,7 +1262,7 @@ assert isClone() == false
12621262
// Accumulate the updates needed to complete the aborted QUEUED with generation shard snapshots
12631263
abortedAssignedQueuedShardConsumer.accept(
12641264
shardEntry.getKey(),
1265-
new ShardSnapshotStatus(null, ShardState.FAILED, status.generation, reason)
1265+
new ShardSnapshotStatus(localNodeId, ShardState.FAILED, status.generation, reason)
12661266
);
12671267
}
12681268
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ public void apply(Settings value, Settings current, Settings previous) {
578578
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
579579
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
580580
SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING,
581+
SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING,
581582
RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING,
582583
FsHealthService.ENABLED_SETTING,
583584
FsHealthService.REFRESH_INTERVAL_SETTING,

server/src/main/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ public PerNodeShardSnapshotCounter(
4444
return;
4545
}
4646
for (var shardSnapshotStatus : entry.shards().values()) {
47-
// TODO: consider more states as active, e.g. Abort on data node?
48-
if (shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT) {
47+
if (isRunningOnDataNode(shardSnapshotStatus)) {
4948
perNodeCounts.computeIfPresent(shardSnapshotStatus.nodeId(), (nodeId, count) -> count + 1);
5049
}
5150
}
@@ -54,6 +53,13 @@ public PerNodeShardSnapshotCounter(
5453
}
5554
}
5655

56+
private static boolean isRunningOnDataNode(SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus) {
57+
return shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT
58+
|| (shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.ABORTED && shardSnapshotStatus.reason() != null
59+
// TODO: find a better way to check for abort on data nodes
60+
&& shardSnapshotStatus.reason().startsWith("assigned-queued aborted") == false);
61+
}
62+
5763
public boolean tryStartShardSnapshotOnNode(String nodeId) {
5864
if (enabled() == false) {
5965
return true;

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2305,6 +2305,7 @@ public ClusterState execute(ClusterState currentState) {
23052305
&& snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
23062306
// snapshot is started - mark every non completed shard as aborted
23072307
final SnapshotsInProgress.Entry abortedEntry = existing.abort(
2308+
currentState.nodes().getLocalNodeId(),
23082309
((shardId, shardSnapshotStatus) -> completeAbortedAssignedQueuedRunnables.add(
23092310
() -> innerUpdateSnapshotState(
23102311
existing.snapshot(),
@@ -3820,9 +3821,7 @@ private <T> void applyShardSnapshotUpdate(
38203821
assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus;
38213822
}
38223823

3823-
if (shardSnapshotStatusUpdate.isClone() == false
3824-
&& existing.state() == ShardState.INIT // TODO: this is not right, should consider aborted ones on data nodes as well
3825-
&& updatedShardSnapshotStatus.state() != ShardState.INIT) {
3824+
if (shardSnapshotStatusUpdate.isClone() == false && changeReleasesDataNode(existing, updatedShardSnapshotStatus)) {
38263825
perNodeShardSnapshotCounter.completeShardSnapshotOnNode(updatedShardSnapshotStatus.nodeId());
38273826
}
38283827
logger.trace(
@@ -3836,6 +3835,18 @@ private <T> void applyShardSnapshotUpdate(
38363835
executedUpdates.add(shardSnapshotStatusUpdate);
38373836
}
38383837

3838+
private boolean changeReleasesDataNode(ShardSnapshotStatus previous, ShardSnapshotStatus current) {
3839+
if (previous.state() == ShardState.INIT) {
3840+
return current.state().completed() || current.state() == ShardState.PAUSED_FOR_NODE_REMOVAL;
3841+
}
3842+
// TODO find a better to check for abort on data nodes
3843+
if (previous.state() == ShardState.ABORTED
3844+
&& (previous.reason() == null || previous.reason().startsWith("assigned-queued aborted") == false)) {
3845+
return current.state().completed();
3846+
}
3847+
return false;
3848+
}
3849+
38393850
private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) {
38403851
// the update was already executed on the clone operation it applied to, now we check if it may be possible to
38413852
// start a shard snapshot or clone operation on the current entry

0 commit comments

Comments
 (0)