Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e8c27a5
Allow missing shard stats for restarted nodes for _snapshot/_status
JeremyDahlgren May 23, 2025
52909c7
[CI] Auto commit changes from spotless
May 23, 2025
d88eb20
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 23, 2025
6e41497
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 30, 2025
83bd1a9
Use -1 for missing stats values, address code review comments
JeremyDahlgren May 30, 2025
f3464b5
Update docs/changelog/128399.yaml
JeremyDahlgren May 30, 2025
b7414af
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 31, 2025
a534a12
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 2, 2025
64217d4
Improve serialization tests, address code review comments
JeremyDahlgren Jun 3, 2025
1cd8dc8
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
a8b2d2e
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b2f4bf3
Update server/src/main/java/org/elasticsearch/action/admin/cluster/sn…
JeremyDahlgren Jun 3, 2025
b1fc83c
Fix fromXContent() bug, add @Nullable, update description string
JeremyDahlgren Jun 3, 2025
91a6442
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
3908def
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b1db683
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
4f157ee
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
bd55889
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
e8eeed6
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.GroupedActionListener;
Expand Down Expand Up @@ -249,22 +248,23 @@ public void testCorrectCountsForDoneShards() throws Exception {
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertNull("expected a null description for snapshot shard status: " + snapshotShardState, snapshotShardState.getDescription());
}, 30L, TimeUnit.SECONDS);

final SnapshotStats snapshotShardStats = stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
final int totalFiles = snapshotShardStats.getTotalFileCount();
final long totalFileSize = snapshotShardStats.getTotalSize();

internalCluster().restartNode(dataNodeTwo);

final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart = stateFirstShard(
getSnapshotStatus(repoName, snapshotOne),
indexTwo
);
final var snapshotStatusAfterRestart = getSnapshotStatus(repoName, snapshotOne);
final var snapshotShardStateAfterNodeRestart = stateFirstShard(snapshotStatusAfterRestart, indexTwo);
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
assertNotNull("expected a description string for missing stats", snapshotShardStateAfterNodeRestart.getDescription());
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(0));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(0L));
assertNotNull("expected a non-null description string for missing stats", snapshotShardStateAfterNodeRestart.getDescription());
final var missingStats = snapshotShardStateAfterNodeRestart.getStats();
assertThat(missingStats.getTotalFileCount(), equalTo(-1));
assertThat(missingStats.getTotalSize(), equalTo(-1L));

final var snapshotShardStateIndexOne = stateFirstShard(snapshotStatusAfterRestart, indexOne);
assertNull("expected a null description string for available stats", snapshotShardStateIndexOne.getDescription());
assertThat(snapshotShardStateIndexOne.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardStateIndexOne.getStats().getTotalSize(), greaterThan(0L));

unblockAllDataNodes(repoName);
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED = def(9_084_0_00);
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
public static final TransportVersion ESQL_REGEX_MATCH_WITH_CASE_INSENSITIVITY = def(9_086_0_00);
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED = def(9_087_0_00);
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_087_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED;
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;

public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {

Expand All @@ -41,7 +41,7 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED)) {
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
description = in.readOptionalString();
}
}
Expand Down Expand Up @@ -100,6 +100,20 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
this.description = description;
}

/**
* Creates an instance for scenarios where the snapshot stats are unavailable, with a non-null description of why the stats are missing.
*/
public static SnapshotIndexShardStatus forMissingStats(ShardId shardId, SnapshotIndexShardStage stage, String description) {
return new SnapshotIndexShardStatus(
shardId,
stage,
SnapshotStats.forMissingStats(),
null,
null,
Objects.requireNonNull(description)
);
}

/**
* Returns snapshot stage
*/
Expand Down Expand Up @@ -142,7 +156,7 @@ public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);
out.writeOptionalString(nodeId);
out.writeOptionalString(failure);
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED)) {
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
out.writeOptionalString(description);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.io.IOException;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;

public class SnapshotStats implements Writeable, ToXContentObject {

private long startTime;
Expand All @@ -37,6 +39,18 @@ public class SnapshotStats implements Writeable, ToXContentObject {
SnapshotStats() {}

SnapshotStats(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) && in.readBoolean() == false) {
startTime = 0L;
time = 0L;
incrementalFileCount = -1;
processedFileCount = -1;
incrementalSize = -1L;
processedSize = -1L;
totalFileCount = -1;
totalSize = -1L;
return;
}

startTime = in.readVLong();
time = in.readVLong();

Expand Down Expand Up @@ -71,6 +85,20 @@ public class SnapshotStats implements Writeable, ToXContentObject {
this.processedSize = processedSize;
}

/**
* Returns a stats instance with -1 for all non-time values, for use in situations where the snapshot stats are unavailable.
*/
public static SnapshotStats forMissingStats() {
return new SnapshotStats(0L, 0L, -1, -1, -1, -1L, -1L, -1L);
}

/**
* Returns true if this instance is for a shard snapshot with unavailable stats.
*/
public boolean isMissingStats() {
return incrementalFileCount == -1;
}

/**
* Returns time when snapshot started
*/
Expand Down Expand Up @@ -129,6 +157,22 @@ public long getProcessedSize() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
if (isMissingStats()) {
out.writeBoolean(false);
return;
}
out.writeBoolean(true);
} else if (isMissingStats()) {
throw new IllegalStateException(
"cannot serialize empty stats for transport version ["
+ out.getTransportVersion()
+ "] less than ["
+ SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS
+ "]"
);
}

out.writeVLong(startTime);
out.writeVLong(time);

Expand Down Expand Up @@ -300,6 +344,10 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
* @param updateTimestamps Whether or not start time and duration should be updated
*/
void add(SnapshotStats stats, boolean updateTimestamps) {
if (stats.isMissingStats()) {
return;
}

incrementalFileCount += stats.incrementalFileCount;
totalFileCount += stats.totalFileCount;
processedFileCount += stats.processedFileCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -119,7 +121,7 @@ protected void masterOperation(
Arrays.asList(request.snapshots())
);
if (currentSnapshots.isEmpty()) {
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
return;
}

Expand Down Expand Up @@ -152,6 +154,7 @@ protected void masterOperation(
request,
currentSnapshots,
nodeSnapshotStatuses,
state.getMinTransportVersion(),
cancellableTask,
l
)
Expand All @@ -160,7 +163,7 @@ protected void masterOperation(
);
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
}

}
Expand All @@ -171,6 +174,7 @@ void buildResponse(
SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
TransportVersion minTransportVersion,
CancellableTask task,
ActionListener<SnapshotsStatusResponse> listener
) {
Expand Down Expand Up @@ -235,19 +239,27 @@ void buildResponse(
};
final SnapshotIndexShardStatus shardStatus;
if (stage == SnapshotIndexShardStage.DONE) {
final ShardId shardId = entry.shardId(shardEntry.getKey());
// When processing currently running snapshots, instead of reading the statistics from the repository, which can be
// expensive, we choose instead to provide a message to the caller explaining why the stats are missing and the API
// that can be used to load them.
shardStatus = new SnapshotIndexShardStatus(
entry.shardId(shardEntry.getKey()),
stage,
new SnapshotStats(),
null,
null,
"""
// that can be used to load them once the snapshot has completed.
if (minTransportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
shardStatus = SnapshotIndexShardStatus.forMissingStats(shardId, stage, """
Snapshot shard stats missing from a currently running snapshot due to a node leaving the cluster after \
completing the snapshot; use /_snapshot/<repository>/<snapshot>/_status to load from the repository."""
);
completing the shard snapshot; use /_snapshot/<repository>/<snapshot>/_status to load from the repository \
once the snapshot has completed.""");
} else {
// BWC behavior, load the stats directly from the repository.
shardStatus = new SnapshotIndexShardStatus(
shardId,
repositoriesService.repository(entry.repository())
.getShardSnapshotStatus(
entry.snapshot().getSnapshotId(),
entry.indices().get(shardId.getIndexName()),
shardId
)
);
}
} else {
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.getKey()), stage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@

package org.elasticsearch.action.admin.cluster.snapshots.status;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xcontent.XContentParser;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;

public class SnapshotStatsTests extends AbstractXContentTestCase<SnapshotStats> {

Expand Down Expand Up @@ -48,4 +54,46 @@ protected SnapshotStats doParseInstance(XContentParser parser) throws IOExceptio
protected boolean supportsUnknownFields() {
return true;
}

public void testMissingStats() throws IOException {
final var populatedStats = createTestInstance();
final var missingStats = SnapshotStats.forMissingStats();
assertEquals(0L, missingStats.getStartTime());
assertEquals(0L, missingStats.getTime());
assertEquals(-1, missingStats.getTotalFileCount());
assertEquals(-1, missingStats.getIncrementalFileCount());
assertEquals(-1, missingStats.getProcessedFileCount());
assertEquals(-1L, missingStats.getTotalSize());
assertEquals(-1L, missingStats.getIncrementalSize());
assertEquals(-1L, missingStats.getProcessedSize());

// Verify round trip serialization.
for (var transportVersion : List.of(
TransportVersions.MINIMUM_COMPATIBLE,
TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS,
TransportVersion.current()
)) {

for (var stats : List.of(populatedStats, missingStats)) {
final var bytesOut = new ByteArrayOutputStream();

try (var streamOut = new OutputStreamStreamOutput(bytesOut)) {
streamOut.setTransportVersion(transportVersion);

if (transportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) || stats != missingStats) {
stats.writeTo(streamOut);
} else {
assertThrows(IllegalStateException.class, () -> stats.writeTo(streamOut));
continue;
}
}

try (var streamIn = new ByteArrayStreamInput(bytesOut.toByteArray())) {
streamIn.setTransportVersion(transportVersion);
final var statsRead = new SnapshotStats(streamIn);
assertEquals(stats, statsRead);
}
}
}
}
}
Loading