Skip to content

Commit 4768d27

Browse files
committed
Tweak and add tests for PerNodeShardSnapshotCounter
1 parent f5d3b64 commit 4768d27

File tree

5 files changed

+202
-34
lines changed

5 files changed

+202
-34
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,6 @@ public ShardSnapshotResult shardSnapshotResult() {
846846
* ({@link ShardState#INIT} or {@link ShardState#ABORTED}) or about to write to it in state {@link ShardState#WAITING} or
847847
* {@link ShardState#PAUSED_FOR_NODE_REMOVAL}.
848848
*/
849-
// TODO: review its usage again
850849
public boolean isActive() {
851850
return switch (state) {
852851
case INIT, ABORTED, WAITING, PAUSED_FOR_NODE_REMOVAL -> true;
@@ -858,6 +857,10 @@ public boolean isActiveOrAssignedQueued() {
858857
return isActive() || isAssignedQueued();
859858
}
860859

860+
public boolean isAbortedAssignedQueued() {
861+
return state == ShardState.ABORTED && reason != null && reason.startsWith("assigned-queued aborted");
862+
}
863+
861864
@Override
862865
public void writeTo(StreamOutput out) throws IOException {
863866
out.writeOptionalString(nodeId);

server/src/main/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounter.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,6 @@ public PerNodeShardSnapshotCounter(
5353
}
5454
}
5555

56-
private static boolean isRunningOnDataNode(SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus) {
57-
return shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT
58-
|| (shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.ABORTED && shardSnapshotStatus.reason() != null
59-
// TODO: find a better way to check for abort on data nodes
60-
&& shardSnapshotStatus.reason().startsWith("assigned-queued aborted") == false);
61-
}
62-
6356
public boolean tryStartShardSnapshotOnNode(String nodeId) {
6457
if (enabled() == false) {
6558
return true;
@@ -95,18 +88,6 @@ public boolean hasCapacityOnAnyNode() {
9588
return enabled() == false || perNodeCounts.values().stream().anyMatch(count -> count < shardSnapshotPerNodeLimit);
9689
}
9790

98-
public boolean hasCapacityOnNode(String nodeId) {
99-
if (enabled() == false) {
100-
return true;
101-
}
102-
final Integer count = perNodeCounts.get(nodeId);
103-
return count != null && count < shardSnapshotPerNodeLimit;
104-
}
105-
106-
private boolean enabled() {
107-
return shardSnapshotPerNodeLimit > 0;
108-
}
109-
11091
@Override
11192
public String toString() {
11293
return "PerNodeShardSnapshotCounter{"
@@ -116,4 +97,15 @@ public String toString() {
11697
+ perNodeCounts
11798
+ '}';
11899
}
100+
101+
private boolean enabled() {
102+
return shardSnapshotPerNodeLimit > 0;
103+
}
104+
105+
private static boolean isRunningOnDataNode(SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus) {
106+
return shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT
107+
// Aborted shard snapshot may still be running on the data node unless it was assigned-queued, i.e. never actually started
108+
|| (shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.ABORTED
109+
&& shardSnapshotStatus.isAbortedAssignedQueued() == false);
110+
}
119111
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3835,9 +3835,8 @@ private boolean changeReleasesDataNode(ShardSnapshotStatus previous, ShardSnapsh
38353835
if (previous.state() == ShardState.INIT) {
38363836
return current.state().completed() || current.state() == ShardState.PAUSED_FOR_NODE_REMOVAL;
38373837
}
3838-
// TODO find a better to check for abort on data nodes
3839-
if (previous.state() == ShardState.ABORTED
3840-
&& (previous.reason() == null || previous.reason().startsWith("assigned-queued aborted") == false)) {
3838+
// Aborted shard snapshot releases data node capacity unless it was assigned-queued, i.e. never actually started
3839+
if (previous.state() == ShardState.ABORTED && previous.isAbortedAssignedQueued() == false) {
38413840
return current.state().completed();
38423841
}
38433842
return false;
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.snapshots;
11+
12+
import org.elasticsearch.cluster.SnapshotsInProgress;
13+
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
15+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
16+
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
17+
import org.elasticsearch.cluster.node.DiscoveryNodes;
18+
import org.elasticsearch.test.ESTestCase;
19+
20+
import java.util.Set;
21+
22+
import static org.elasticsearch.snapshots.SnapshotsInProgressSerializationTests.randomSnapshot;
23+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
24+
import static org.hamcrest.Matchers.lessThan;
25+
26+
public class PerNodeShardSnapshotCounterTests extends ESTestCase {
27+
28+
public void testDisabledWhenLimitIsZero() {
29+
final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter(
30+
0,
31+
SnapshotsInProgress.EMPTY,
32+
DiscoveryNodes.EMPTY_NODES,
33+
randomBoolean()
34+
);
35+
assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode());
36+
assertTrue(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(randomIdentifier()));
37+
assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(randomIdentifier()));
38+
}
39+
40+
public void testNodesCapacities() {
41+
final var snapshotNodeIds = randomList(1, 5, () -> randomAlphaOfLength(16));
42+
final var nonSnapshotNodeIds = randomList(0, 5, () -> randomAlphaOfLength(18));
43+
44+
final boolean isStateless = randomBoolean();
45+
final Set<DiscoveryNodeRole> snapshotNodeRole;
46+
final Set<DiscoveryNodeRole> nonSnapshotNodeRole;
47+
if (isStateless) {
48+
snapshotNodeRole = Set.of(DiscoveryNodeRole.INDEX_ROLE);
49+
nonSnapshotNodeRole = Set.of(randomFrom(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.ML_ROLE));
50+
} else {
51+
snapshotNodeRole = Set.copyOf(
52+
randomNonEmptySubsetOf(
53+
Set.of(
54+
DiscoveryNodeRole.DATA_ROLE,
55+
DiscoveryNodeRole.DATA_HOT_NODE_ROLE,
56+
DiscoveryNodeRole.DATA_WARM_NODE_ROLE,
57+
DiscoveryNodeRole.DATA_COLD_NODE_ROLE,
58+
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE,
59+
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE
60+
)
61+
)
62+
);
63+
nonSnapshotNodeRole = Set.copyOf(
64+
randomNonEmptySubsetOf(
65+
Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.ML_ROLE, DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE)
66+
)
67+
);
68+
}
69+
70+
final var nodesBuilder = DiscoveryNodes.builder();
71+
for (var snapshotNodeId : snapshotNodeIds) {
72+
nodesBuilder.add(DiscoveryNodeUtils.builder(snapshotNodeId).name(snapshotNodeId).roles(snapshotNodeRole).build());
73+
}
74+
for (var nonSnapshotNodeId : nonSnapshotNodeIds) {
75+
nodesBuilder.add(DiscoveryNodeUtils.builder(nonSnapshotNodeId).name(nonSnapshotNodeId).roles(nonSnapshotNodeRole).build());
76+
}
77+
final DiscoveryNodes discoveryNodes = nodesBuilder.build();
78+
79+
final int perNodeLimit = between(1, 10);
80+
81+
// Basic test when there is no ongoing snapshots
82+
{
83+
final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter(
84+
perNodeLimit,
85+
SnapshotsInProgress.EMPTY,
86+
discoveryNodes,
87+
isStateless
88+
);
89+
assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode());
90+
91+
// Always false for non-snapshotting nodes
92+
for (var nonSnapshotNodeId : nonSnapshotNodeIds) {
93+
assertFalse(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(nonSnapshotNodeId));
94+
assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(nonSnapshotNodeId));
95+
}
96+
97+
for (var snapshotNodeId : snapshotNodeIds) {
98+
assertTrue(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId));
99+
assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId));
100+
}
101+
}
102+
103+
// random snapshots
104+
{
105+
final int numRepos = between(1, 3);
106+
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY;
107+
for (int i = 0; i < numRepos; i++) {
108+
final var numSnapshots = between(1, 5);
109+
for (int j = 0; j < numSnapshots; j++) {
110+
final var entry = randomSnapshot(ProjectId.DEFAULT, "repo-" + i, () -> randomFrom(snapshotNodeIds));
111+
if (entry.hasAssignedQueuedShards()) {
112+
assertTrue(entry.shards().values().stream().anyMatch(ShardSnapshotStatus::isAssignedQueued));
113+
} else {
114+
assertFalse(entry.shards().values().stream().anyMatch(ShardSnapshotStatus::isAssignedQueued));
115+
}
116+
snapshotsInProgress = snapshotsInProgress.withAddedEntry(entry);
117+
}
118+
}
119+
120+
final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter(
121+
perNodeLimit,
122+
snapshotsInProgress,
123+
discoveryNodes,
124+
isStateless
125+
);
126+
127+
// Always false for non-snapshotting nodes
128+
for (var nonSnapshotNodeId : nonSnapshotNodeIds) {
129+
assertFalse(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(nonSnapshotNodeId));
130+
assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(nonSnapshotNodeId));
131+
}
132+
133+
for (var snapshotNodeId : snapshotNodeIds) {
134+
final var numRunning = runningShardSnapshotsForNode(snapshotsInProgress, snapshotNodeId);
135+
final var started = perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId);
136+
if (started) {
137+
assertThat(numRunning, lessThan(perNodeLimit));
138+
if (numRunning > 0) {
139+
assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId));
140+
} else {
141+
assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId));
142+
}
143+
} else {
144+
assertThat(numRunning, greaterThanOrEqualTo(perNodeLimit));
145+
assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId));
146+
}
147+
}
148+
}
149+
}
150+
151+
private int runningShardSnapshotsForNode(SnapshotsInProgress snapshotsInProgress, String nodeId) {
152+
int result = 0;
153+
final var allEntries = snapshotsInProgress.asStream().toList();
154+
for (var entry : allEntries) {
155+
if (entry.isClone() == false && entry.state().completed() == false) {
156+
for (ShardSnapshotStatus shardSnapshot : entry.shards().values()) {
157+
if (nodeId.equals(shardSnapshot.nodeId())) {
158+
if (shardSnapshot.state() == SnapshotsInProgress.ShardState.INIT) {
159+
result++;
160+
} else if (shardSnapshot.state() == SnapshotsInProgress.ShardState.ABORTED
161+
&& (shardSnapshot.reason() == null || shardSnapshot.reason().startsWith("assigned-queued aborted") == false)) {
162+
result++;
163+
}
164+
}
165+
}
166+
}
167+
}
168+
return result;
169+
}
170+
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,16 @@ private ClusterState getClusterStateWithNodeShutdownMetadata(List<String> nodeId
140140
);
141141
}
142142

143-
private Entry randomSnapshot() {
144-
return randomSnapshot(randomProjectIdOrDefault());
143+
public static Entry randomSnapshot() {
144+
return randomSnapshot(randomProjectIdOrDefault(), "repo-" + randomInt(5), () -> randomAlphaOfLength(10));
145145
}
146146

147-
private Entry randomSnapshot(ProjectId projectId) {
148-
Snapshot snapshot = new Snapshot(
149-
projectId,
150-
"repo-" + randomInt(5),
151-
new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))
152-
);
147+
public static Entry randomSnapshot(ProjectId projectId) {
148+
return randomSnapshot(projectId, "repo-" + randomInt(5), () -> randomAlphaOfLength(10));
149+
}
150+
151+
public static Entry randomSnapshot(ProjectId projectId, String repoName, Supplier<String> nodeIdSupplier) {
152+
Snapshot snapshot = new Snapshot(projectId, repoName, new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10)));
153153
boolean includeGlobalState = randomBoolean();
154154
boolean partial = randomBoolean();
155155
int numberOfIndices = randomIntBetween(0, 10);
@@ -166,7 +166,7 @@ private Entry randomSnapshot(ProjectId projectId) {
166166
for (Index idx : esIndices) {
167167
int shardsCount = randomIntBetween(1, 10);
168168
for (int j = 0; j < shardsCount; j++) {
169-
shards.put(new ShardId(idx, j), randomShardSnapshotStatus(randomAlphaOfLength(10)));
169+
shards.put(new ShardId(idx, j), randomShardSnapshotStatus(nodeIdSupplier.get()));
170170
}
171171
}
172172
List<SnapshotFeatureInfo> featureStates = randomList(5, SnapshotFeatureInfoTests::randomSnapshotFeatureInfo);
@@ -187,10 +187,14 @@ private Entry randomSnapshot(ProjectId projectId) {
187187
);
188188
}
189189

190-
private SnapshotsInProgress.ShardSnapshotStatus randomShardSnapshotStatus(String nodeId) {
190+
public static SnapshotsInProgress.ShardSnapshotStatus randomShardSnapshotStatus(String nodeId) {
191191
ShardState shardState = randomFrom(ShardState.values());
192192
if (shardState == ShardState.QUEUED) {
193-
return SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED;
193+
if (randomBoolean()) {
194+
return SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED;
195+
} else {
196+
return SnapshotsInProgress.ShardSnapshotStatus.assignedQueued(nodeId, randomBoolean() ? new ShardGeneration(1L) : null);
197+
}
194198
} else if (shardState == ShardState.SUCCESS) {
195199
final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(new ShardGeneration(1L), ByteSizeValue.ofBytes(1L), 1);
196200
return SnapshotsInProgress.ShardSnapshotStatus.success(nodeId, shardSnapshotResult);

0 commit comments

Comments
 (0)