Skip to content

Commit 83bd1a9

Browse files
Use -1 for missing stats values, address code review comments
1 parent 6e41497 commit 83bd1a9

File tree

7 files changed

+176
-31
lines changed

7 files changed

+176
-31
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
1414
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
1515
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
16-
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
1716
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
1817
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
1918
import org.elasticsearch.action.support.GroupedActionListener;
@@ -249,22 +248,23 @@ public void testCorrectCountsForDoneShards() throws Exception {
249248
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
250249
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
251250
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
251+
assertNull("expected a null description for snapshot shard status: " + snapshotShardState, snapshotShardState.getDescription());
252252
}, 30L, TimeUnit.SECONDS);
253253

254-
final SnapshotStats snapshotShardStats = stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
255-
final int totalFiles = snapshotShardStats.getTotalFileCount();
256-
final long totalFileSize = snapshotShardStats.getTotalSize();
257-
258254
internalCluster().restartNode(dataNodeTwo);
259255

260-
final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart = stateFirstShard(
261-
getSnapshotStatus(repoName, snapshotOne),
262-
indexTwo
263-
);
256+
final var snapshotStatusAfterRestart = getSnapshotStatus(repoName, snapshotOne);
257+
final var snapshotShardStateAfterNodeRestart = stateFirstShard(snapshotStatusAfterRestart, indexTwo);
264258
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
265-
assertNotNull("expected a description string for missing stats", snapshotShardStateAfterNodeRestart.getDescription());
266-
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(0));
267-
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(0L));
259+
assertNotNull("expected a non-null description string for missing stats", snapshotShardStateAfterNodeRestart.getDescription());
260+
final var missingStats = snapshotShardStateAfterNodeRestart.getStats();
261+
assertThat(missingStats.getTotalFileCount(), equalTo(-1));
262+
assertThat(missingStats.getTotalSize(), equalTo(-1L));
263+
264+
final var snapshotShardStateIndexOne = stateFirstShard(snapshotStatusAfterRestart, indexOne);
265+
assertNull("expected a null description string for available stats", snapshotShardStateIndexOne.getDescription());
266+
assertThat(snapshotShardStateIndexOne.getStats().getTotalFileCount(), greaterThan(0));
267+
assertThat(snapshotShardStateIndexOne.getStats().getTotalSize(), greaterThan(0L));
268268

269269
unblockAllDataNodes(repoName);
270270
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ static TransportVersion def(int id) {
273273
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED = def(9_084_0_00);
274274
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
275275
public static final TransportVersion ESQL_REGEX_MATCH_WITH_CASE_INSENSITIVITY = def(9_086_0_00);
276-
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED = def(9_087_0_00);
276+
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_087_0_00);
277277

278278
/*
279279
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.IOException;
2222
import java.util.Objects;
2323

24-
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED;
24+
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;
2525

2626
public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {
2727

@@ -41,7 +41,7 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
4141
stats = new SnapshotStats(in);
4242
nodeId = in.readOptionalString();
4343
failure = in.readOptionalString();
44-
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED)) {
44+
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
4545
description = in.readOptionalString();
4646
}
4747
}
@@ -100,6 +100,20 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
100100
this.description = description;
101101
}
102102

103+
/**
104+
* Creates an instance for scenarios where the snapshot stats are unavailable, with a non-null description of why the stats are missing.
105+
*/
106+
public static SnapshotIndexShardStatus forMissingStats(ShardId shardId, SnapshotIndexShardStage stage, String description) {
107+
return new SnapshotIndexShardStatus(
108+
shardId,
109+
stage,
110+
SnapshotStats.forMissingStats(),
111+
null,
112+
null,
113+
Objects.requireNonNull(description)
114+
);
115+
}
116+
103117
/**
104118
* Returns snapshot stage
105119
*/
@@ -142,7 +156,7 @@ public void writeTo(StreamOutput out) throws IOException {
142156
stats.writeTo(out);
143157
out.writeOptionalString(nodeId);
144158
out.writeOptionalString(failure);
145-
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED)) {
159+
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
146160
out.writeOptionalString(description);
147161
}
148162
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import java.io.IOException;
2525

26+
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;
27+
2628
public class SnapshotStats implements Writeable, ToXContentObject {
2729

2830
private long startTime;
@@ -37,6 +39,18 @@ public class SnapshotStats implements Writeable, ToXContentObject {
3739
SnapshotStats() {}
3840

3941
SnapshotStats(StreamInput in) throws IOException {
42+
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) && in.readBoolean() == false) {
43+
startTime = 0L;
44+
time = 0L;
45+
incrementalFileCount = -1;
46+
processedFileCount = -1;
47+
incrementalSize = -1L;
48+
processedSize = -1L;
49+
totalFileCount = -1;
50+
totalSize = -1L;
51+
return;
52+
}
53+
4054
startTime = in.readVLong();
4155
time = in.readVLong();
4256

@@ -71,6 +85,20 @@ public class SnapshotStats implements Writeable, ToXContentObject {
7185
this.processedSize = processedSize;
7286
}
7387

88+
/**
89+
* Returns a stats instance with -1 for all non-time values, for use in situations where the snapshot stats are unavailable.
90+
*/
91+
public static SnapshotStats forMissingStats() {
92+
return new SnapshotStats(0L, 0L, -1, -1, -1, -1L, -1L, -1L);
93+
}
94+
95+
/**
96+
* Returns true if this instance is for a shard snapshot with unavailable stats.
97+
*/
98+
public boolean isMissingStats() {
99+
return incrementalFileCount == -1;
100+
}
101+
74102
/**
75103
* Returns time when snapshot started
76104
*/
@@ -129,6 +157,22 @@ public long getProcessedSize() {
129157

130158
@Override
131159
public void writeTo(StreamOutput out) throws IOException {
160+
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
161+
if (isMissingStats()) {
162+
out.writeBoolean(false);
163+
return;
164+
}
165+
out.writeBoolean(true);
166+
} else if (isMissingStats()) {
167+
throw new IllegalStateException(
168+
"cannot serialize empty stats for transport version ["
169+
+ out.getTransportVersion()
170+
+ "] less than ["
171+
+ SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS
172+
+ "]"
173+
);
174+
}
175+
132176
out.writeVLong(startTime);
133177
out.writeVLong(time);
134178

@@ -300,6 +344,10 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
300344
* @param updateTimestamps Whether or not start time and duration should be updated
301345
*/
302346
void add(SnapshotStats stats, boolean updateTimestamps) {
347+
if (stats.isMissingStats()) {
348+
return;
349+
}
350+
303351
incrementalFileCount += stats.incrementalFileCount;
304352
totalFileCount += stats.totalFileCount;
305353
processedFileCount += stats.processedFileCount;

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.TransportVersion;
15+
import org.elasticsearch.TransportVersions;
1416
import org.elasticsearch.action.ActionListener;
1517
import org.elasticsearch.action.ActionType;
1618
import org.elasticsearch.action.support.ActionFilters;
@@ -119,7 +121,7 @@ protected void masterOperation(
119121
Arrays.asList(request.snapshots())
120122
);
121123
if (currentSnapshots.isEmpty()) {
122-
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
124+
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
123125
return;
124126
}
125127

@@ -152,6 +154,7 @@ protected void masterOperation(
152154
request,
153155
currentSnapshots,
154156
nodeSnapshotStatuses,
157+
state.getMinTransportVersion(),
155158
cancellableTask,
156159
l
157160
)
@@ -160,7 +163,7 @@ protected void masterOperation(
160163
);
161164
} else {
162165
// We don't have any in-progress shards, just return current stats
163-
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
166+
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
164167
}
165168

166169
}
@@ -171,6 +174,7 @@ void buildResponse(
171174
SnapshotsStatusRequest request,
172175
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
173176
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
177+
TransportVersion minTransportVersion,
174178
CancellableTask task,
175179
ActionListener<SnapshotsStatusResponse> listener
176180
) {
@@ -235,19 +239,27 @@ void buildResponse(
235239
};
236240
final SnapshotIndexShardStatus shardStatus;
237241
if (stage == SnapshotIndexShardStage.DONE) {
242+
final ShardId shardId = entry.shardId(shardEntry.getKey());
238243
// When processing currently running snapshots, instead of reading the statistics from the repository, which can be
239244
// expensive, we choose instead to provide a message to the caller explaining why the stats are missing and the API
240-
// that can be used to load them.
241-
shardStatus = new SnapshotIndexShardStatus(
242-
entry.shardId(shardEntry.getKey()),
243-
stage,
244-
new SnapshotStats(),
245-
null,
246-
null,
247-
"""
245+
// that can be used to load them once the snapshot has completed.
246+
if (minTransportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
247+
shardStatus = SnapshotIndexShardStatus.forMissingStats(shardId, stage, """
248248
Snapshot shard stats missing from a currently running snapshot due to a node leaving the cluster after \
249-
completing the snapshot; use /_snapshot/<repository>/<snapshot>/_status to load from the repository."""
250-
);
249+
completing the shard snapshot; use /_snapshot/<repository>/<snapshot>/_status to load from the repository \
250+
once the snapshot has completed.""");
251+
} else {
252+
// BWC behavior, load the stats directly from the repository.
253+
shardStatus = new SnapshotIndexShardStatus(
254+
shardId,
255+
repositoriesService.repository(entry.repository())
256+
.getShardSnapshotStatus(
257+
entry.snapshot().getSnapshotId(),
258+
entry.indices().get(shardId.getIndexName()),
259+
shardId
260+
)
261+
);
262+
}
251263
} else {
252264
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.getKey()), stage);
253265
}

server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatsTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,16 @@
99

1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.TransportVersions;
14+
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
15+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
1216
import org.elasticsearch.test.AbstractXContentTestCase;
1317
import org.elasticsearch.xcontent.XContentParser;
1418

19+
import java.io.ByteArrayOutputStream;
1520
import java.io.IOException;
21+
import java.util.List;
1622

1723
public class SnapshotStatsTests extends AbstractXContentTestCase<SnapshotStats> {
1824

@@ -48,4 +54,46 @@ protected SnapshotStats doParseInstance(XContentParser parser) throws IOExceptio
4854
protected boolean supportsUnknownFields() {
4955
return true;
5056
}
57+
58+
public void testMissingStats() throws IOException {
59+
final var populatedStats = createTestInstance();
60+
final var missingStats = SnapshotStats.forMissingStats();
61+
assertEquals(0L, missingStats.getStartTime());
62+
assertEquals(0L, missingStats.getTime());
63+
assertEquals(-1, missingStats.getTotalFileCount());
64+
assertEquals(-1, missingStats.getIncrementalFileCount());
65+
assertEquals(-1, missingStats.getProcessedFileCount());
66+
assertEquals(-1L, missingStats.getTotalSize());
67+
assertEquals(-1L, missingStats.getIncrementalSize());
68+
assertEquals(-1L, missingStats.getProcessedSize());
69+
70+
// Verify round trip serialization.
71+
for (var transportVersion : List.of(
72+
TransportVersions.MINIMUM_COMPATIBLE,
73+
TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS,
74+
TransportVersion.current()
75+
)) {
76+
77+
for (var stats : List.of(populatedStats, missingStats)) {
78+
final var bytesOut = new ByteArrayOutputStream();
79+
80+
try (var streamOut = new OutputStreamStreamOutput(bytesOut)) {
81+
streamOut.setTransportVersion(transportVersion);
82+
83+
if (transportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) || stats != missingStats) {
84+
stats.writeTo(streamOut);
85+
} else {
86+
assertThrows(IllegalStateException.class, () -> stats.writeTo(streamOut));
87+
continue;
88+
}
89+
}
90+
91+
try (var streamIn = new ByteArrayStreamInput(bytesOut.toByteArray())) {
92+
streamIn.setTransportVersion(transportVersion);
93+
final var statsRead = new SnapshotStats(streamIn);
94+
assertEquals(stats, statsRead);
95+
}
96+
}
97+
}
98+
}
5199
}

0 commit comments

Comments
 (0)