Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e8c27a5
Allow missing shard stats for restarted nodes for _snapshot/_status
JeremyDahlgren May 23, 2025
52909c7
[CI] Auto commit changes from spotless
May 23, 2025
d88eb20
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 23, 2025
6e41497
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 30, 2025
83bd1a9
Use -1 for missing stats values, address code review comments
JeremyDahlgren May 30, 2025
f3464b5
Update docs/changelog/128399.yaml
JeremyDahlgren May 30, 2025
b7414af
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 31, 2025
a534a12
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 2, 2025
64217d4
Improve serialization tests, address code review comments
JeremyDahlgren Jun 3, 2025
1cd8dc8
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
a8b2d2e
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b2f4bf3
Update server/src/main/java/org/elasticsearch/action/admin/cluster/sn…
JeremyDahlgren Jun 3, 2025
b1fc83c
Fix fromXContent() bug, add @Nullable, update description string
JeremyDahlgren Jun 3, 2025
91a6442
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
3908def
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b1db683
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
4f157ee
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
bd55889
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
e8eeed6
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128399.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128399
summary: Allow missing shard stats for restarted nodes for `_snapshot/_status`
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.GroupedActionListener;
Expand Down Expand Up @@ -212,7 +211,8 @@ public void testGetSnapshotsWithoutIndices() throws Exception {
* 1. Start snapshot of two shards (both located on separate data nodes).
* 2. Have one of the shards snapshot completely and the other block
* 3. Restart the data node that completed its shard snapshot
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes for non-restarted nodes
* 5. Make sure the description string is set for shard snapshots on restarted nodes.
*
* @throws Exception on failure
*/
Expand Down Expand Up @@ -248,21 +248,24 @@ public void testCorrectCountsForDoneShards() throws Exception {
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertNull("expected a null description for snapshot shard status: " + snapshotShardState, snapshotShardState.getDescription());
}, 30L, TimeUnit.SECONDS);

final SnapshotStats snapshotShardStats = stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
final int totalFiles = snapshotShardStats.getTotalFileCount();
final long totalFileSize = snapshotShardStats.getTotalSize();

internalCluster().restartNode(dataNodeTwo);

final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart = stateFirstShard(
getSnapshotStatus(repoName, snapshotOne),
indexTwo
);
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
final var snapshotStatusAfterRestart = getSnapshotStatus(repoName, snapshotOne);

final var snapshotShardStateIndexTwo = stateFirstShard(snapshotStatusAfterRestart, indexTwo);
assertThat(snapshotShardStateIndexTwo.getStage(), is(SnapshotIndexShardStage.DONE));
assertNotNull("expected a non-null description string for missing stats", snapshotShardStateIndexTwo.getDescription());
final var missingStats = snapshotShardStateIndexTwo.getStats();
assertThat(missingStats.getTotalFileCount(), equalTo(-1));
assertThat(missingStats.getTotalSize(), equalTo(-1L));

final var snapshotShardStateIndexOne = stateFirstShard(snapshotStatusAfterRestart, indexOne);
assertNull("expected a null description string for available stats", snapshotShardStateIndexOne.getDescription());
assertThat(snapshotShardStateIndexOne.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardStateIndexOne.getStats().getTotalSize(), greaterThan(0L));

unblockAllDataNodes(repoName);
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REGEX_MATCH_WITH_CASE_INSENSITIVITY = def(9_086_0_00);
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES = def(9_087_0_00);
public static final TransportVersion JOIN_ON_ALIASES = def(9_088_0_00);
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_089_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;

public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {

private final SnapshotIndexShardStage stage;
Expand All @@ -31,12 +33,17 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements

private String failure;

private String description;

public SnapshotIndexShardStatus(StreamInput in) throws IOException {
super(in);
stage = SnapshotIndexShardStage.fromValue(in.readByte());
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
description = in.readOptionalString();
}
}

SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage) {
Expand Down Expand Up @@ -74,11 +81,38 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
}

SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage, SnapshotStats stats, String nodeId, String failure) {
this(shardId, stage, stats, nodeId, failure, null);
}

SnapshotIndexShardStatus(
ShardId shardId,
SnapshotIndexShardStage stage,
SnapshotStats stats,
String nodeId,
String failure,
String description
) {
super(shardId);
this.stage = stage;
this.stats = stats;
this.nodeId = nodeId;
this.failure = failure;
this.description = description;
}

/**
* Creates an instance for scenarios where the snapshot is {@link SnapshotIndexShardStage#DONE} but the stats are unavailable, with a
* non-null description of why the stats are missing.
*/
public static SnapshotIndexShardStatus forDoneButMissingStats(ShardId shardId, String description) {
return new SnapshotIndexShardStatus(
shardId,
SnapshotIndexShardStage.DONE,
SnapshotStats.forMissingStats(),
null,
null,
Objects.requireNonNull(description)
);
}

/**
Expand Down Expand Up @@ -109,19 +143,30 @@ public String getFailure() {
return failure;
}

/**
* Returns the optional description of the data values contained in the {@code stats} field.
*/
public String getDescription() {
return description;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(stage.value());
stats.writeTo(out);
out.writeOptionalString(nodeId);
out.writeOptionalString(failure);
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
out.writeOptionalString(description);
}
}

static final class Fields {
static final String STAGE = "stage";
static final String REASON = "reason";
static final String NODE = "node";
static final String DESCRIPTION = "description";
}

@Override
Expand All @@ -135,6 +180,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getFailure() != null) {
builder.field(Fields.REASON, getFailure());
}
if (getDescription() != null) {
builder.field(Fields.DESCRIPTION, getDescription());
}
builder.endObject();
return builder;
}
Expand All @@ -151,7 +199,8 @@ public boolean equals(Object o) {
return stage == that.stage
&& Objects.equals(stats, that.stats)
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(failure, that.failure);
&& Objects.equals(failure, that.failure)
&& Objects.equals(description, that.description);
}

@Override
Expand All @@ -160,6 +209,7 @@ public int hashCode() {
result = 31 * result + (stats != null ? stats.hashCode() : 0);
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
result = 31 * result + (description != null ? description.hashCode() : 0);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.io.IOException;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;

public class SnapshotStats implements Writeable, ToXContentObject {

private long startTime;
Expand All @@ -37,6 +39,19 @@ public class SnapshotStats implements Writeable, ToXContentObject {
SnapshotStats() {}

SnapshotStats(StreamInput in) throws IOException {
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) && in.readBoolean() == false) {
startTime = 0L;
time = 0L;
incrementalFileCount = -1;
processedFileCount = -1;
incrementalSize = -1L;
processedSize = -1L;
totalFileCount = -1;
totalSize = -1L;
return;
}

startTime = in.readVLong();
time = in.readVLong();

Expand Down Expand Up @@ -71,6 +86,20 @@ public class SnapshotStats implements Writeable, ToXContentObject {
this.processedSize = processedSize;
}

/**
* Returns a stats instance with -1 for all non-time values, for use in situations where the snapshot stats are unavailable.
*/
public static SnapshotStats forMissingStats() {
return new SnapshotStats(0L, 0L, -1, -1, -1, -1L, -1L, -1L);
}

/**
* Returns true if this instance is for a shard snapshot with unavailable stats.
*/
public boolean isMissingStats() {
return incrementalFileCount == -1;
}

/**
* Returns time when snapshot started
*/
Expand Down Expand Up @@ -129,6 +158,23 @@ public long getProcessedSize() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
if (isMissingStats()) {
out.writeBoolean(false);
return;
}
out.writeBoolean(true);
} else if (isMissingStats()) {
throw new IllegalStateException(
"cannot serialize empty stats for transport version ["
+ out.getTransportVersion()
+ "] less than ["
+ SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS
+ "]"
);
}

out.writeVLong(startTime);
out.writeVLong(time);

Expand Down Expand Up @@ -282,6 +328,11 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
}
}
}
// For missing stats incrementalFileCount will be -1, and we expect processedFileCount and processedSize to be omitted (still zero).
if (incrementalFileCount == -1) {
assert processedFileCount == 0 && processedSize == 0L && incrementalSize == -1L && totalFileCount == -1 && totalSize == -1L;
return SnapshotStats.forMissingStats();
}
return new SnapshotStats(
startTime,
time,
Expand All @@ -300,6 +351,10 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
* @param updateTimestamps Whether or not start time and duration should be updated
*/
void add(SnapshotStats stats, boolean updateTimestamps) {
if (stats.isMissingStats()) {
return;
}

incrementalFileCount += stats.incrementalFileCount;
totalFileCount += stats.totalFileCount;
processedFileCount += stats.processedFileCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -119,7 +121,7 @@ protected void masterOperation(
Arrays.asList(request.snapshots())
);
if (currentSnapshots.isEmpty()) {
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
return;
}

Expand Down Expand Up @@ -152,6 +154,7 @@ protected void masterOperation(
request,
currentSnapshots,
nodeSnapshotStatuses,
state.getMinTransportVersion(),
cancellableTask,
l
)
Expand All @@ -160,7 +163,7 @@ protected void masterOperation(
);
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
}

}
Expand All @@ -171,6 +174,7 @@ void buildResponse(
SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
TransportVersion minTransportVersion,
CancellableTask task,
ActionListener<SnapshotsStatusResponse> listener
) {
Expand All @@ -196,13 +200,13 @@ void buildResponse(
}
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
if (status.nodeId() != null) {
// We should have information about this shard from the shard:
// We should have information about this shard from the node:
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
if (nodeStatus != null) {
Map<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshot());
if (shardStatues != null) {
Map<ShardId, SnapshotIndexShardStatus> shardStatuses = nodeStatus.status().get(entry.snapshot());
if (shardStatuses != null) {
final ShardId sid = entry.shardId(shardEntry.getKey());
SnapshotIndexShardStatus shardStatus = shardStatues.get(sid);
SnapshotIndexShardStatus shardStatus = shardStatuses.get(sid);
if (shardStatus != null) {
// We have full information about this shard
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE
Expand All @@ -228,27 +232,34 @@ void buildResponse(
}
// We failed to find the status of the shard from the responses we received from data nodes.
// This can happen if nodes drop out of the cluster completely or restart during the snapshot.
// We rebuild the information they would have provided from their in memory state from the cluster
// state and the repository contents in the below logic
final SnapshotIndexShardStage stage = switch (shardEntry.getValue().state()) {
case FAILED, ABORTED, MISSING -> SnapshotIndexShardStage.FAILURE;
case INIT, WAITING, PAUSED_FOR_NODE_REMOVAL, QUEUED -> SnapshotIndexShardStage.STARTED;
case SUCCESS -> SnapshotIndexShardStage.DONE;
};
final SnapshotIndexShardStatus shardStatus;
if (stage == SnapshotIndexShardStage.DONE) {
// Shard snapshot completed successfully so we should be able to load the exact statistics for this
// shard from the repository already.
final ShardId shardId = entry.shardId(shardEntry.getKey());
shardStatus = new SnapshotIndexShardStatus(
shardId,
repositoriesService.repository(entry.repository())
.getShardSnapshotStatus(
entry.snapshot().getSnapshotId(),
entry.indices().get(shardId.getIndexName()),
shardId
)
);
// When processing currently running snapshots, instead of reading the statistics from the repository, which can be
// expensive, we choose instead to provide a message to the caller explaining why the stats are missing and the API
// that can be used to load them once the snapshot has completed.
if (minTransportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
shardStatus = SnapshotIndexShardStatus.forDoneButMissingStats(shardId, """
Snapshot shard stats missing from a currently running snapshot due to a node leaving the cluster after \
completing the shard snapshot; use /_snapshot/<repository>/<snapshot>/_status to load from the repository \
once the snapshot has completed.""");
} else {
// BWC behavior, load the stats directly from the repository.
shardStatus = new SnapshotIndexShardStatus(
shardId,
repositoriesService.repository(entry.repository())
.getShardSnapshotStatus(
entry.snapshot().getSnapshotId(),
entry.indices().get(shardId.getIndexName()),
shardId
)
);
}
} else {
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.getKey()), stage);
}
Expand Down
Loading
Loading