Skip to content

Commit 1b49eab

Browse files
Allow missing shard stats for restarted nodes for _snapshot/_status (#128399)
Returns an empty shard stats for shard entries where stats were unavailable in the case where a node has been restarted or left the cluster. The change adds a 'description' field to the SnapshotIndexShardStatus class that is used to include a message indicating why the stats are empty. This change was motivated by a desire to reduce latency for getting the stats for currently running snapshots. The stats can still be loaded from the repository via a _snapshot/<repository>/snapshot/_status call. Closes ES-10982 Co-authored-by: Dianna Hohensee <[email protected]>
1 parent 0cac912 commit 1b49eab

File tree

10 files changed

+391
-36
lines changed

10 files changed

+391
-36
lines changed

docs/changelog/128399.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128399
2+
summary: Allow missing shard stats for restarted nodes for `_snapshot/_status`
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
1414
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
1515
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
16-
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
1716
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
1817
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
1918
import org.elasticsearch.action.support.GroupedActionListener;
@@ -212,7 +211,8 @@ public void testGetSnapshotsWithoutIndices() throws Exception {
212211
* 1. Start snapshot of two shards (both located on separate data nodes).
213212
* 2. Have one of the shards snapshot completely and the other block
214213
* 3. Restart the data node that completed its shard snapshot
215-
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes
214+
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes for non-restarted nodes
215+
* 5. Make sure the description string is set for shard snapshots on restarted nodes.
216216
*
217217
* @throws Exception on failure
218218
*/
@@ -248,21 +248,24 @@ public void testCorrectCountsForDoneShards() throws Exception {
248248
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
249249
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
250250
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
251+
assertNull("expected a null description for snapshot shard status: " + snapshotShardState, snapshotShardState.getDescription());
251252
}, 30L, TimeUnit.SECONDS);
252253

253-
final SnapshotStats snapshotShardStats = stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
254-
final int totalFiles = snapshotShardStats.getTotalFileCount();
255-
final long totalFileSize = snapshotShardStats.getTotalSize();
256-
257254
internalCluster().restartNode(dataNodeTwo);
258255

259-
final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart = stateFirstShard(
260-
getSnapshotStatus(repoName, snapshotOne),
261-
indexTwo
262-
);
263-
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
264-
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
265-
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
256+
final var snapshotStatusAfterRestart = getSnapshotStatus(repoName, snapshotOne);
257+
258+
final var snapshotShardStateIndexTwo = stateFirstShard(snapshotStatusAfterRestart, indexTwo);
259+
assertThat(snapshotShardStateIndexTwo.getStage(), is(SnapshotIndexShardStage.DONE));
260+
assertNotNull("expected a non-null description string for missing stats", snapshotShardStateIndexTwo.getDescription());
261+
final var missingStats = snapshotShardStateIndexTwo.getStats();
262+
assertThat(missingStats.getTotalFileCount(), equalTo(-1));
263+
assertThat(missingStats.getTotalSize(), equalTo(-1L));
264+
265+
final var snapshotShardStateIndexOne = stateFirstShard(snapshotStatusAfterRestart, indexOne);
266+
assertNull("expected a null description string for available stats", snapshotShardStateIndexOne.getDescription());
267+
assertThat(snapshotShardStateIndexOne.getStats().getTotalFileCount(), greaterThan(0));
268+
assertThat(snapshotShardStateIndexOne.getStats().getTotalSize(), greaterThan(0L));
266269

267270
unblockAllDataNodes(repoName);
268271
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ static TransportVersion def(int id) {
289289
public static final TransportVersion ML_INFERENCE_MISTRAL_CHAT_COMPLETION_ADDED = def(9_090_0_00);
290290
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES_ALLOW_LIST = def(9_091_0_00);
291291
public static final TransportVersion SEARCH_SOURCE_EXCLUDE_VECTORS_PARAM = def(9_092_0_00);
292+
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00);
293+
292294
/*
293295
* STOP! READ THIS FIRST! No, really,
294296
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.index.shard.ShardId;
1718
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
1819
import org.elasticsearch.xcontent.ToXContentFragment;
@@ -21,6 +22,8 @@
2122
import java.io.IOException;
2223
import java.util.Objects;
2324

25+
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;
26+
2427
public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {
2528

2629
private final SnapshotIndexShardStage stage;
@@ -31,12 +34,17 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
3134

3235
private String failure;
3336

37+
private String description;
38+
3439
public SnapshotIndexShardStatus(StreamInput in) throws IOException {
3540
super(in);
3641
stage = SnapshotIndexShardStage.fromValue(in.readByte());
3742
stats = new SnapshotStats(in);
3843
nodeId = in.readOptionalString();
3944
failure = in.readOptionalString();
45+
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
46+
description = in.readOptionalString();
47+
}
4048
}
4149

4250
SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage) {
@@ -74,11 +82,38 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
7482
}
7583

7684
SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage, SnapshotStats stats, String nodeId, String failure) {
85+
this(shardId, stage, stats, nodeId, failure, null);
86+
}
87+
88+
SnapshotIndexShardStatus(
89+
ShardId shardId,
90+
SnapshotIndexShardStage stage,
91+
SnapshotStats stats,
92+
String nodeId,
93+
String failure,
94+
@Nullable String description
95+
) {
7796
super(shardId);
7897
this.stage = stage;
7998
this.stats = stats;
8099
this.nodeId = nodeId;
81100
this.failure = failure;
101+
this.description = description;
102+
}
103+
104+
/**
105+
* Creates an instance for scenarios where the snapshot is {@link SnapshotIndexShardStage#DONE} but the stats are unavailable, with a
106+
* non-null description of why the stats are missing.
107+
*/
108+
public static SnapshotIndexShardStatus forDoneButMissingStats(ShardId shardId, String description) {
109+
return new SnapshotIndexShardStatus(
110+
shardId,
111+
SnapshotIndexShardStage.DONE,
112+
SnapshotStats.forMissingStats(),
113+
null,
114+
null,
115+
Objects.requireNonNull(description)
116+
);
82117
}
83118

84119
/**
@@ -109,19 +144,31 @@ public String getFailure() {
109144
return failure;
110145
}
111146

147+
/**
148+
* Returns the optional description of the data values contained in the {@code stats} field.
149+
*/
150+
@Nullable
151+
public String getDescription() {
152+
return description;
153+
}
154+
112155
@Override
113156
public void writeTo(StreamOutput out) throws IOException {
114157
super.writeTo(out);
115158
out.writeByte(stage.value());
116159
stats.writeTo(out);
117160
out.writeOptionalString(nodeId);
118161
out.writeOptionalString(failure);
162+
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
163+
out.writeOptionalString(description);
164+
}
119165
}
120166

121167
static final class Fields {
122168
static final String STAGE = "stage";
123169
static final String REASON = "reason";
124170
static final String NODE = "node";
171+
static final String DESCRIPTION = "description";
125172
}
126173

127174
@Override
@@ -135,6 +182,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
135182
if (getFailure() != null) {
136183
builder.field(Fields.REASON, getFailure());
137184
}
185+
if (getDescription() != null) {
186+
builder.field(Fields.DESCRIPTION, getDescription());
187+
}
138188
builder.endObject();
139189
return builder;
140190
}
@@ -151,7 +201,8 @@ public boolean equals(Object o) {
151201
return stage == that.stage
152202
&& Objects.equals(stats, that.stats)
153203
&& Objects.equals(nodeId, that.nodeId)
154-
&& Objects.equals(failure, that.failure);
204+
&& Objects.equals(failure, that.failure)
205+
&& Objects.equals(description, that.description);
155206
}
156207

157208
@Override
@@ -160,6 +211,7 @@ public int hashCode() {
160211
result = 31 * result + (stats != null ? stats.hashCode() : 0);
161212
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
162213
result = 31 * result + (failure != null ? failure.hashCode() : 0);
214+
result = 31 * result + (description != null ? description.hashCode() : 0);
163215
return result;
164216
}
165217

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.io.IOException;
2323

24+
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;
25+
2426
public class SnapshotStats implements Writeable, ToXContentObject {
2527

2628
private long startTime;
@@ -35,6 +37,19 @@ public class SnapshotStats implements Writeable, ToXContentObject {
3537
SnapshotStats() {}
3638

3739
SnapshotStats(StreamInput in) throws IOException {
40+
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
41+
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) && in.readBoolean() == false) {
42+
startTime = 0L;
43+
time = 0L;
44+
incrementalFileCount = -1;
45+
processedFileCount = -1;
46+
incrementalSize = -1L;
47+
processedSize = -1L;
48+
totalFileCount = -1;
49+
totalSize = -1L;
50+
return;
51+
}
52+
3853
startTime = in.readVLong();
3954
time = in.readVLong();
4055

@@ -69,6 +84,20 @@ public class SnapshotStats implements Writeable, ToXContentObject {
6984
this.processedSize = processedSize;
7085
}
7186

87+
/**
88+
* Returns a stats instance with invalid field values for use in situations where the snapshot stats are unavailable.
89+
*/
90+
public static SnapshotStats forMissingStats() {
91+
return new SnapshotStats(0L, 0L, -1, -1, -1, -1L, -1L, -1L);
92+
}
93+
94+
/**
95+
* Returns true if this instance is for a shard snapshot with unavailable stats.
96+
*/
97+
public boolean isMissingStats() {
98+
return incrementalFileCount == -1;
99+
}
100+
72101
/**
73102
* Returns time when snapshot started
74103
*/
@@ -127,6 +156,23 @@ public long getProcessedSize() {
127156

128157
@Override
129158
public void writeTo(StreamOutput out) throws IOException {
159+
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
160+
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
161+
if (isMissingStats()) {
162+
out.writeBoolean(false);
163+
return;
164+
}
165+
out.writeBoolean(true);
166+
} else if (isMissingStats()) {
167+
throw new IllegalStateException(
168+
"cannot serialize empty stats for transport version ["
169+
+ out.getTransportVersion()
170+
+ "] less than ["
171+
+ SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS
172+
+ "]"
173+
);
174+
}
175+
130176
out.writeVLong(startTime);
131177
out.writeVLong(time);
132178

@@ -196,6 +242,10 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
196242
* @param updateTimestamps Whether or not start time and duration should be updated
197243
*/
198244
void add(SnapshotStats stats, boolean updateTimestamps) {
245+
if (stats.isMissingStats()) {
246+
return;
247+
}
248+
199249
incrementalFileCount += stats.incrementalFileCount;
200250
totalFileCount += stats.totalFileCount;
201251
processedFileCount += stats.processedFileCount;

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.TransportVersion;
15+
import org.elasticsearch.TransportVersions;
1416
import org.elasticsearch.action.ActionListener;
1517
import org.elasticsearch.action.ActionType;
1618
import org.elasticsearch.action.support.ActionFilters;
@@ -119,7 +121,7 @@ protected void masterOperation(
119121
Arrays.asList(request.snapshots())
120122
);
121123
if (currentSnapshots.isEmpty()) {
122-
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
124+
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
123125
return;
124126
}
125127

@@ -152,6 +154,7 @@ protected void masterOperation(
152154
request,
153155
currentSnapshots,
154156
nodeSnapshotStatuses,
157+
state.getMinTransportVersion(),
155158
cancellableTask,
156159
l
157160
)
@@ -160,7 +163,7 @@ protected void masterOperation(
160163
);
161164
} else {
162165
// We don't have any in-progress shards, just return current stats
163-
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
166+
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
164167
}
165168

166169
}
@@ -171,6 +174,7 @@ void buildResponse(
171174
SnapshotsStatusRequest request,
172175
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
173176
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
177+
TransportVersion minTransportVersion,
174178
CancellableTask task,
175179
ActionListener<SnapshotsStatusResponse> listener
176180
) {
@@ -196,13 +200,13 @@ void buildResponse(
196200
}
197201
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
198202
if (status.nodeId() != null) {
199-
// We should have information about this shard from the shard:
203+
// We should have information about this shard from the node:
200204
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
201205
if (nodeStatus != null) {
202-
Map<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshot());
203-
if (shardStatues != null) {
206+
Map<ShardId, SnapshotIndexShardStatus> shardStatuses = nodeStatus.status().get(entry.snapshot());
207+
if (shardStatuses != null) {
204208
final ShardId sid = entry.shardId(shardEntry.getKey());
205-
SnapshotIndexShardStatus shardStatus = shardStatues.get(sid);
209+
SnapshotIndexShardStatus shardStatus = shardStatuses.get(sid);
206210
if (shardStatus != null) {
207211
// We have full information about this shard
208212
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE
@@ -228,27 +232,34 @@ void buildResponse(
228232
}
229233
// We failed to find the status of the shard from the responses we received from data nodes.
230234
// This can happen if nodes drop out of the cluster completely or restart during the snapshot.
231-
// We rebuild the information they would have provided from their in memory state from the cluster
232-
// state and the repository contents in the below logic
233235
final SnapshotIndexShardStage stage = switch (shardEntry.getValue().state()) {
234236
case FAILED, ABORTED, MISSING -> SnapshotIndexShardStage.FAILURE;
235237
case INIT, WAITING, PAUSED_FOR_NODE_REMOVAL, QUEUED -> SnapshotIndexShardStage.STARTED;
236238
case SUCCESS -> SnapshotIndexShardStage.DONE;
237239
};
238240
final SnapshotIndexShardStatus shardStatus;
239241
if (stage == SnapshotIndexShardStage.DONE) {
240-
// Shard snapshot completed successfully so we should be able to load the exact statistics for this
241-
// shard from the repository already.
242242
final ShardId shardId = entry.shardId(shardEntry.getKey());
243-
shardStatus = new SnapshotIndexShardStatus(
244-
shardId,
245-
repositoriesService.repository(entry.repository())
246-
.getShardSnapshotStatus(
247-
entry.snapshot().getSnapshotId(),
248-
entry.indices().get(shardId.getIndexName()),
249-
shardId
250-
)
251-
);
243+
// When processing currently running snapshots, instead of reading the statistics from the repository, which can be
244+
// expensive, we choose instead to provide a message to the caller explaining why the stats are missing and the API
245+
// that can be used to load them once the snapshot has completed.
246+
if (minTransportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
247+
shardStatus = SnapshotIndexShardStatus.forDoneButMissingStats(shardId, """
248+
Snapshot shard stats missing from a currently running snapshot due to a node leaving the cluster after \
249+
completing the shard snapshot; retry once the snapshot has completed to load all shard stats from the \
250+
repository.""");
251+
} else {
252+
// BWC behavior, load the stats directly from the repository.
253+
shardStatus = new SnapshotIndexShardStatus(
254+
shardId,
255+
repositoriesService.repository(entry.repository())
256+
.getShardSnapshotStatus(
257+
entry.snapshot().getSnapshotId(),
258+
entry.indices().get(shardId.getIndexName()),
259+
shardId
260+
)
261+
);
262+
}
252263
} else {
253264
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.getKey()), stage);
254265
}

0 commit comments

Comments
 (0)