Skip to content

Commit 447de70

Browse files
committed
Add assigned queued and limit INIT per node
With the limit enabled, shard snapshots that would have been in the INIT state are now put into assigned QUEUED state when the limit is reached on the node. This new state is transitioned into INIT when a shard snapshot completes and gives back capacity to the node. The shard snapshot that completes can be either from the same snapshot or a different snapshot in either the same or different repo.
1 parent 256a7fe commit 447de70

File tree

5 files changed

+353
-23
lines changed

5 files changed

+353
-23
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ public int count() {
197197
return count;
198198
}
199199

200+
public Set<ProjectRepo> repos() {
201+
return entries.keySet();
202+
}
203+
200204
public Iterable<List<Entry>> entriesByRepo() {
201205
return () -> Iterators.map(entries.values().iterator(), byRepo -> byRepo.entries);
202206
}
@@ -504,7 +508,7 @@ private static boolean assertShardStateConsistent(
504508
int shardId,
505509
ShardSnapshotStatus shardSnapshotStatus
506510
) {
507-
if (shardSnapshotStatus.isActive()) {
511+
if (shardSnapshotStatus.isActiveOrAssignedQueued()) {
508512
Tuple<String, Integer> plainShardId = Tuple.tuple(indexName, shardId);
509513
assert assignedShards.add(plainShardId) : plainShardId + " is assigned twice in " + entries;
510514
assert queuedShards.contains(plainShardId) == false : plainShardId + " is queued then assigned in " + entries;
@@ -752,6 +756,19 @@ public static ShardSnapshotStatus success(String nodeId, ShardSnapshotResult sha
752756
return new ShardSnapshotStatus(nodeId, ShardState.SUCCESS, shardSnapshotResult.getGeneration(), null, shardSnapshotResult);
753757
}
754758

759+
@SuppressForbidden(reason = "using a private constructor within the same file")
760+
public static ShardSnapshotStatus assignedQueued(String nodeId, ShardGeneration generation) {
761+
return new ShardSnapshotStatus(nodeId, ShardState.QUEUED, generation, null, null);
762+
}
763+
764+
public boolean isAssignedQueued() {
765+
return state == ShardState.QUEUED && nodeId != null && generation != null;
766+
}
767+
768+
public boolean isUnassignedQueued() {
769+
return this == UNASSIGNED_QUEUED || (state == ShardState.QUEUED && nodeId == null && generation == null);
770+
}
771+
755772
public ShardSnapshotStatus(
756773
@Nullable String nodeId,
757774
ShardState state,
@@ -772,8 +789,16 @@ private boolean assertConsistent() {
772789
assert state.failed() == false || reason != null;
773790
assert (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL)
774791
|| nodeId != null : "Null node id for state [" + state + "]";
775-
assert state != ShardState.QUEUED || (nodeId == null && generation == null && reason == null)
776-
: "Found unexpected non-null values for queued state shard nodeId[" + nodeId + "][" + generation + "][" + reason + "]";
792+
assert state != ShardState.QUEUED || (isUnassignedQueued() || isAssignedQueued())
793+
: "Found unexpected shard state=["
794+
+ state
795+
+ "], nodeId=["
796+
+ nodeId
797+
+ "], generation=["
798+
+ generation
799+
+ "], reason=["
800+
+ reason
801+
+ "]";
777802
assert state == ShardState.SUCCESS || shardSnapshotResult == null;
778803
assert shardSnapshotResult == null || shardSnapshotResult.getGeneration().equals(generation)
779804
: "generation [" + generation + "] does not match result generation [" + shardSnapshotResult.getGeneration() + "]";
@@ -787,7 +812,7 @@ public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException {
787812
final ShardGeneration generation = in.readOptionalWriteable(ShardGeneration::new);
788813
final String reason = in.readOptionalString();
789814
final ShardSnapshotResult shardSnapshotResult = in.readOptionalWriteable(ShardSnapshotResult::new);
790-
if (state == ShardState.QUEUED) {
815+
if (state == ShardState.QUEUED && nodeId == null && generation == null) {
791816
return UNASSIGNED_QUEUED;
792817
}
793818
return new ShardSnapshotStatus(nodeId, state, generation, reason, shardSnapshotResult);
@@ -819,13 +844,18 @@ public ShardSnapshotResult shardSnapshotResult() {
819844
* ({@link ShardState#INIT} or {@link ShardState#ABORTED}) or about to write to it in state {@link ShardState#WAITING} or
820845
* {@link ShardState#PAUSED_FOR_NODE_REMOVAL}.
821846
*/
847+
// TODO: review its usage again
822848
public boolean isActive() {
823849
return switch (state) {
824850
case INIT, ABORTED, WAITING, PAUSED_FOR_NODE_REMOVAL -> true;
825851
case SUCCESS, FAILED, MISSING, QUEUED -> false;
826852
};
827853
}
828854

855+
public boolean isActiveOrAssignedQueued() {
856+
return isActive() || isAssignedQueued();
857+
}
858+
829859
@Override
830860
public void writeTo(StreamOutput out) throws IOException {
831861
out.writeOptionalString(nodeId);
@@ -1210,7 +1240,7 @@ public Entry abort() {
12101240
final String nodeId = status.nodeId();
12111241
status = new ShardSnapshotStatus(
12121242
nodeId,
1213-
nodeId == null ? ShardState.FAILED : ShardState.ABORTED,
1243+
nodeId == null ? ShardState.FAILED : ShardState.ABORTED, // TODO: Aborted if assigned queued?
12141244
status.generation(),
12151245
"aborted by snapshot deletion"
12161246
);

server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ private static void addStateInformation(
6767
int shardId,
6868
String indexName
6969
) {
70-
if (shardState.isActive()) {
70+
// Both active or assigned queued means the shard is meant to be the one with actions if node capacity allows it
71+
if (shardState.isActiveOrAssignedQueued()) {
7172
busyIds.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId);
7273
assert assertGenerationConsistency(generations, indexName, shardId, shardState.generation());
7374
} else if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) {
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.snapshots;
11+
12+
import org.elasticsearch.cluster.SnapshotsInProgress;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
15+
import org.elasticsearch.cluster.node.DiscoveryNodes;
16+
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
public class PerNodeShardSnapshotCounter {
21+
22+
private final int shardSnapshotPerNodeLimit;
23+
private final Map<String, Integer> perNodeCounts;
24+
25+
public PerNodeShardSnapshotCounter(
26+
int shardSnapshotPerNodeLimit,
27+
SnapshotsInProgress snapshotsInProgress,
28+
DiscoveryNodes nodes,
29+
boolean isStateless
30+
) {
31+
this.shardSnapshotPerNodeLimit = shardSnapshotPerNodeLimit;
32+
if (this.shardSnapshotPerNodeLimit <= 0) {
33+
this.perNodeCounts = Map.of();
34+
} else {
35+
final var perNodeCounts = nodes.getDataNodes()
36+
.values()
37+
.stream()
38+
.filter(node -> isStateless == false || node.hasRole(DiscoveryNodeRole.INDEX_ROLE.roleName()))
39+
.filter(node -> snapshotsInProgress.isNodeIdForRemoval(node.getId()) == false)
40+
.collect(Collectors.toMap(DiscoveryNode::getId, node -> 0));
41+
42+
snapshotsInProgress.asStream().forEach(entry -> {
43+
if (entry.state().completed() || entry.isClone()) {
44+
return;
45+
}
46+
for (var shardSnapshotStatus : entry.shards().values()) {
47+
if (shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT) {
48+
perNodeCounts.computeIfPresent(shardSnapshotStatus.nodeId(), (nodeId, count) -> count + 1);
49+
}
50+
}
51+
});
52+
this.perNodeCounts = perNodeCounts;
53+
}
54+
}
55+
56+
public boolean tryStartShardSnapshotOnNode(String nodeId) {
57+
if (enabled() == false) {
58+
return true;
59+
}
60+
final Integer count = perNodeCounts.get(nodeId);
61+
if (count == null) {
62+
return false;
63+
}
64+
if (count < shardSnapshotPerNodeLimit) {
65+
perNodeCounts.put(nodeId, count + 1);
66+
return true;
67+
} else {
68+
return false;
69+
}
70+
}
71+
72+
public boolean completeShardSnapshotOnNode(String nodeId) {
73+
if (enabled() == false) {
74+
return true;
75+
}
76+
final Integer count = perNodeCounts.get(nodeId);
77+
if (count == null) {
78+
return false;
79+
}
80+
if (count <= 0) {
81+
return false;
82+
}
83+
perNodeCounts.put(nodeId, count - 1);
84+
return true;
85+
}
86+
87+
public boolean hasCapacityOnAnyNode() {
88+
return enabled() == false || perNodeCounts.values().stream().anyMatch(count -> count < shardSnapshotPerNodeLimit);
89+
}
90+
91+
private boolean enabled() {
92+
return shardSnapshotPerNodeLimit > 0;
93+
}
94+
}

0 commit comments

Comments
 (0)